diff --git a/bip380/descriptors/__init__.py b/bip380/descriptors/__init__.py index b5070a5..b26c79d 100644 --- a/bip380/descriptors/__init__.py +++ b/bip380/descriptors/__init__.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +import typing + from bip380.key import DescriptorKey from bip380.miniscript import Node from bip380.utils.hashes import sha256, hash160 @@ -8,15 +12,17 @@ OP_EQUALVERIFY, OP_CHECKSIG, ) - from .checksum import descsum_create from .parsing import descriptor_from_str +if typing.TYPE_CHECKING: + from bip380.miniscript import SatisfactionMaterial + class Descriptor: """A Bitcoin Output Script Descriptor.""" - def from_str(desc_str, strict=False): + def from_str(desc_str: str, strict: bool = False) -> Descriptor: """Parse a Bitcoin Output Script Descriptor from its string representation. :param strict: whether to require the presence of a checksum. @@ -24,24 +30,24 @@ def from_str(desc_str, strict=False): return descriptor_from_str(desc_str, strict) @property - def script_pubkey(self): + def script_pubkey(self) -> CScript: """Get the ScriptPubKey (output 'locking' Script) for this descriptor.""" # To be implemented by derived classes raise NotImplementedError @property - def script_sighash(self): + def script_sighash(self) -> CScript: """Get the Script to be committed to by the signature hash of a spending transaction.""" # To be implemented by derived classes raise NotImplementedError @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: """Get the list of all keys from this descriptor, in order of apparition.""" # To be implemented by derived classes raise NotImplementedError - def derive(self, index): + def derive(self, index: int) -> None: """Derive the key at the given derivation index. A no-op if the key isn't a wildcard. Will start from 2**31 if the key is a "hardened @@ -65,27 +71,27 @@ def satisfy(self, *args, **kwargs): class WshDescriptor(Descriptor): """A Segwit v0 P2WSH Output Script Descriptor.""" - def __init__(self, witness_script): + def __init__(self, witness_script: Node): assert isinstance(witness_script, Node) - self.witness_script = witness_script + self.witness_script: Node = witness_script - def __repr__(self): + def __repr__(self) -> str: return descsum_create(f"wsh({self.witness_script})") @property - def script_pubkey(self): + def script_pubkey(self) -> CScript: witness_program = sha256(self.witness_script.script) return CScript([0, witness_program]) @property - def script_sighash(self): + def script_sighash(self) -> CScript: return self.witness_script.script @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: return self.witness_script.keys - def satisfy(self, sat_material=None): + def satisfy(self, sat_material: SatisfactionMaterial = None) -> typing.List[Node]: """Get the witness stack to spend from this descriptor. :param sat_material: a miniscript.satisfaction.SatisfactionMaterial with data @@ -99,28 +105,28 @@ def satisfy(self, sat_material=None): class WpkhDescriptor(Descriptor): """A Segwit v0 P2WPKH Output Script Descriptor.""" - def __init__(self, pubkey): + def __init__(self, pubkey: DescriptorKey): assert isinstance(pubkey, DescriptorKey) self.pubkey = pubkey - def __repr__(self): + def __repr__(self) -> str: return descsum_create(f"wpkh({self.pubkey})") @property - def script_pubkey(self): + def script_pubkey(self) -> CScript: witness_program = hash160(self.pubkey.bytes()) return CScript([0, witness_program]) @property - def script_sighash(self): + def script_sighash(self) -> CScript: key_hash = hash160(self.pubkey.bytes()) return CScript([OP_DUP, OP_HASH160, key_hash, OP_EQUALVERIFY, OP_CHECKSIG]) @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: return [self.pubkey] - def satisfy(self, signature): + def satisfy(self, signature: bytes) -> typing.List[bytes]: """Get the witness stack to spend from this descriptor. :param signature: a signature (in bytes) for the pubkey from the descriptor. diff --git a/bip380/descriptors/checksum.py b/bip380/descriptors/checksum.py index 9f3e013..464873d 100644 --- a/bip380/descriptors/checksum.py +++ b/bip380/descriptors/checksum.py @@ -5,13 +5,14 @@ """Utility functions related to output descriptors""" import re +import typing INPUT_CHARSET = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ " CHECKSUM_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" GENERATOR = [0xF5DEE51989, 0xA9FDCA3312, 0x1BAB10E32D, 0x3706B1677A, 0x644D626FFD] -def descsum_polymod(symbols): +def descsum_polymod(symbols: typing.List[int]) -> int: """Internal function that computes the descriptor checksum.""" chk = 1 for value in symbols: @@ -22,7 +23,7 @@ def descsum_polymod(symbols): return chk -def descsum_expand(s): +def descsum_expand(s: str) -> typing.Optional[typing.List[int]]: """Internal function that does the character to symbol expansion""" groups = [] symbols = [] @@ -42,7 +43,7 @@ def descsum_expand(s): return symbols -def descsum_create(s): +def descsum_create(s: str) -> str: """Add a checksum to a descriptor without""" symbols = descsum_expand(s) + [0, 0, 0, 0, 0, 0, 0, 0] checksum = descsum_polymod(symbols) ^ 1 @@ -53,7 +54,7 @@ def descsum_create(s): ) -def descsum_check(s): +def descsum_check(s: str) -> bool: """Verify that the checksum is correct in a descriptor""" if s[-9] != "#": return False @@ -63,7 +64,7 @@ def descsum_check(s): return descsum_polymod(symbols) == 1 -def drop_origins(s): +def drop_origins(s: str) -> str: """Drop the key origins from a descriptor""" desc = re.sub(r"\[.+?\]", "", s) if "#" in s: diff --git a/bip380/descriptors/errors.py b/bip380/descriptors/errors.py index f7b5848..cc5c349 100644 --- a/bip380/descriptors/errors.py +++ b/bip380/descriptors/errors.py @@ -1,5 +1,5 @@ class DescriptorParsingError(ValueError): """Error while parsing a Bitcoin Output Descriptor from its string representation""" - def __init__(self, message): - self.message = message + def __init__(self, message: str): + self.message: str = message diff --git a/bip380/descriptors/parsing.py b/bip380/descriptors/parsing.py index 20ca75a..d83fd80 100644 --- a/bip380/descriptors/parsing.py +++ b/bip380/descriptors/parsing.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import bip380.descriptors as descriptors from bip380.key import DescriptorKey, DescriptorKeyError @@ -7,7 +9,7 @@ from .errors import DescriptorParsingError -def split_checksum(desc_str, strict=False): +def split_checksum(desc_str: str, strict: bool = False) -> str: """Removes and check the provided checksum. If not told otherwise, this won't fail on a missing checksum. @@ -28,7 +30,7 @@ def split_checksum(desc_str, strict=False): return descriptor -def descriptor_from_str(desc_str, strict=False): +def descriptor_from_str(desc_str: str, strict: bool = False) -> descriptors.Descriptor: """Parse a Bitcoin Output Script Descriptor from its string representation. :param strict: whether to require the presence of a checksum. diff --git a/bip380/key.py b/bip380/key.py index ae50eda..98e4ad5 100644 --- a/bip380/key.py +++ b/bip380/key.py @@ -1,12 +1,17 @@ +from __future__ import annotations + +import typing +from enum import Enum, auto + from bip32 import BIP32 from bip32.utils import coincurve, _deriv_path_str_to_list + from bip380.utils.hashes import hash160 -from enum import Enum, auto class DescriptorKeyError(Exception): - def __init__(self, message): - self.message = message + def __init__(self, message: str): + self.message: str = message class DescriporKeyOrigin: @@ -15,13 +20,13 @@ class DescriporKeyOrigin: See https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions. """ - def __init__(self, fingerprint, path): + def __init__(self, fingerprint: bytes, path: typing.List[int]): assert isinstance(fingerprint, bytes) and isinstance(path, list) - self.fingerprint = fingerprint - self.path = path + self.fingerprint: bytes = fingerprint + self.path: typing.List[int] = path - def from_str(origin_str): + def from_str(origin_str: str) -> DescriporKeyOrigin: # Origing starts and ends with brackets if not origin_str.startswith("[") or not origin_str.endswith("]"): raise DescriptorKeyError(f"Insane origin: '{origin_str}'") @@ -54,7 +59,7 @@ class KeyPathKind(Enum): WILDCARD_UNHARDENED = auto() WILDCARD_HARDENED = auto() - def is_wildcard(self): + def is_wildcard(self) -> bool: return self in [KeyPathKind.WILDCARD_HARDENED, KeyPathKind.WILDCARD_UNHARDENED] @@ -64,13 +69,13 @@ class DescriptorKeyPath: See https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions. """ - def __init__(self, path, kind): + def __init__(self, path: typing.List[int], kind: KeyPathKind): assert isinstance(path, list) and isinstance(kind, KeyPathKind) - self.path = path - self.kind = kind + self.path: typing.List[int] = path + self.kind: KeyPathKind = kind - def from_str(path_str): + def from_str(path_str: str) -> DescriptorKeyPath: if len(path_str) < 1: raise DescriptorKeyError(f"Insane key path: '{path_str}'") if path_str[0] == "/": @@ -108,7 +113,11 @@ class DescriptorKey: May be an extended or raw public key. """ - def __init__(self, key): + origin: typing.Optional[DescriporKeyOrigin] + path: typing.Optional[DescriptorKeyPath] + key: typing.Union[coincurve.PublicKey, BIP32] + + def __init__(self, key: typing.Union[bytes, BIP32, str]): # Information about the origin of this key. self.origin = None # If it is an xpub, a path toward a child key of that xpub. @@ -156,10 +165,10 @@ def __init__(self, key): "Invalid parameter type: expecting bytes, hex str or BIP32 instance." ) - def __repr__(self): + def __repr__(self) -> str: key = "" - def ser_path(key, path): + def ser_path(key: str, path: typing.List[int]) -> str: for i in path: if i < 2**31: key += f"/{i}" @@ -185,7 +194,7 @@ def ser_path(key, path): return key - def bytes(self): + def bytes(self) -> bytes: if isinstance(self.key, coincurve.PublicKey): return self.key.format() else: @@ -195,7 +204,7 @@ def bytes(self): assert not self.path.kind.is_wildcard() # TODO: real errors return self.key.get_pubkey_from_path(self.path.path) - def derive(self, index): + def derive(self, index: int) -> None: """Derive the key at the given index. A no-op if the key isn't a wildcard. Will start from 2**31 if the key is a "hardened diff --git a/bip380/miniscript/errors.py b/bip380/miniscript/errors.py index 9dc4f19..927cf58 100644 --- a/bip380/miniscript/errors.py +++ b/bip380/miniscript/errors.py @@ -4,12 +4,12 @@ class MiniscriptNodeCreationError(ValueError): - def __init__(self, message): - self.message = message + def __init__(self, message: str): + self.message: str = message class MiniscriptPropertyError(ValueError): - def __init__(self, message): - self.message = message + def __init__(self, message: str): + self.message: str = message # TODO: errors for type errors, parsing errors, etc.. diff --git a/bip380/miniscript/fragments.py b/bip380/miniscript/fragments.py index dfe9098..2dca354 100644 --- a/bip380/miniscript/fragments.py +++ b/bip380/miniscript/fragments.py @@ -4,14 +4,16 @@ Each element correspond to a Bitcoin Script fragment, and has various type properties. See the Miniscript website for the specification of the type system: https://bitcoin.sipa.be/miniscript/. """ +from __future__ import annotations -import copy -import bip380.miniscript.parsing as parsing +import typing +import bip380.miniscript.parsing as parsing from bip380.key import DescriptorKey from bip380.utils.hashes import hash160 from bip380.utils.script import ( CScript, + CScriptOp, OP_1, OP_0, OP_ADD, @@ -42,10 +44,9 @@ OP_VERIFY, OP_0NOTEQUAL, ) - from .errors import MiniscriptNodeCreationError from .property import Property -from .satisfaction import ExecutionInfo, Satisfaction +from .satisfaction import ExecutionInfo, Satisfaction, SatisfactionMaterial # Threshold for nLockTime: below this value it is interpreted as block number, @@ -62,61 +63,64 @@ class Node: """A Miniscript fragment.""" # The fragment's type and properties - p = None + p: Property = None # List of all sub fragments - subs = [] + subs: typing.List[Node] = [] # A list of Script elements, a CScript is created all at once in the script() method. - _script = [] + _script: typing.List[CScriptOp] = [] # Whether any satisfaction for this fragment require a signature - needs_sig = None + needs_sig: bool = None # Whether any dissatisfaction for this fragment requires a signature - is_forced = None + is_forced: bool = None # Whether this fragment has a unique unconditional satisfaction, and all conditional # ones require a signature. - is_expressive = None + is_expressive: bool = None # Whether for any possible way to satisfy this fragment (may be none), a # non-malleable satisfaction exists. - is_nonmalleable = None + is_nonmalleable: bool = None # Whether this node or any of its subs contains an absolute heightlock - abs_heightlocks = None + abs_heightlocks: bool = None # Whether this node or any of its subs contains a relative heightlock - rel_heightlocks = None + rel_heightlocks: bool = None # Whether this node or any of its subs contains an absolute timelock - abs_timelocks = None + abs_timelocks: bool = None # Whether this node or any of its subs contains a relative timelock - rel_timelocks = None + rel_timelocks: bool = None # Whether this node does not contain a mix of timelock or heightlock of different types. # That is, not (abs_heightlocks and rel_heightlocks or abs_timelocks and abs_timelocks) - no_timelock_mix = None + no_timelock_mix: bool = None # Information about this Miniscript execution (satisfaction cost, etc..) - exec_info = None + exec_info: ExecutionInfo = None def __init__(self, *args, **kwargs): # Needs to be implemented by derived classes. raise NotImplementedError - def from_str(ms_str): + def from_str(ms_str: str) -> Node: """Parse a Miniscript fragment from its string representation.""" assert isinstance(ms_str, str) return parsing.miniscript_from_str(ms_str) - def from_script(script, pkh_preimages={}): + def from_script( + script: CScript, + pkh_preimages: typing.Dict[bytes, typing.Union[bytes, str, DescriptorKey]] = {} + ) -> CScriptOp: """Decode a Miniscript fragment from its Script representation.""" assert isinstance(script, CScript) return parsing.miniscript_from_script(script, pkh_preimages) # TODO: have something like BuildScript from Core and get rid of the _script member. @property - def script(self): + def script(self) -> CScript: return CScript(self._script) @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: """Get the list of all keys from this Miniscript, in order of apparition.""" # Overriden by fragments that actually have keys. return [key for sub in self.subs for key in sub.keys] - def satisfy(self, sat_material): + def satisfy(self, sat_material: SatisfactionMaterial) -> typing.Optional[typing.List[bytes]]: """Get the witness of the smallest non-malleable satisfaction for this fragment, if one exists. @@ -128,7 +132,7 @@ def satisfy(self, sat_material): return None return sat.witness - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: """Get the satisfaction for this fragment. :param sat_material: a SatisfactionMaterial containing available data to satisfy @@ -137,7 +141,7 @@ def satisfaction(self, sat_material): # Needs to be implemented by derived classes. raise NotImplementedError - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: """Get the dissatisfaction for this fragment.""" # Needs to be implemented by derived classes. raise NotImplementedError @@ -160,13 +164,13 @@ def __init__(self): self.no_timelock_mix = True self.exec_info = ExecutionInfo(0, 0, None, 0) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.unavailable() - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[]) - def __repr__(self): + def __repr__(self) -> str: return "0" @@ -187,13 +191,13 @@ def __init__(self): self.no_timelock_mix = True self.exec_info = ExecutionInfo(0, 0, 0, None) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction(witness=[]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() - def __repr__(self): + def __repr__(self) -> str: return "1" @@ -203,7 +207,7 @@ class PkNode(Node): Should not be instanced directly, use Pk() or Pkh(). """ - def __init__(self, pubkey): + def __init__(self, pubkey: typing.Union[bytes, str, DescriptorKey]): if isinstance(pubkey, bytes) or isinstance(pubkey, str): self.pubkey = DescriptorKey(pubkey) @@ -223,69 +227,69 @@ def __init__(self, pubkey): self.no_timelock_mix = True @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: return [self.pubkey] # TODO: A PkNode class to inherit those two from? class Pk(PkNode): - def __init__(self, pubkey): + def __init__(self, pubkey: typing.Union[bytes, str, DescriptorKey]): PkNode.__init__(self, pubkey) self.p = Property("Konud") self.exec_info = ExecutionInfo(0, 0, 0, 0) @property - def _script(self): + def _script(self) -> typing.List[bytes]: return [self.pubkey.bytes()] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: sig = sat_material.signatures.get(self.pubkey.bytes()) if sig is None: return Satisfaction.unavailable() return Satisfaction([sig], has_sig=True) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[b""]) - def __repr__(self): + def __repr__(self) -> str: return f"pk_k({self.pubkey})" class Pkh(PkNode): # FIXME: should we support a hash here, like rust-bitcoin? I don't think it's safe. - def __init__(self, pubkey): + def __init__(self, pubkey: typing.Union[bytes, str, DescriptorKey]): PkNode.__init__(self, pubkey) self.p = Property("Knud") self.exec_info = ExecutionInfo(3, 0, 1, 1) @property - def _script(self): + def _script(self) -> typing.List[typing.Union[CScriptOp, bytes]]: return [OP_DUP, OP_HASH160, self.pk_hash(), OP_EQUALVERIFY] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: sig = sat_material.signatures.get(self.pubkey.bytes()) if sig is None: return Satisfaction.unavailable() return Satisfaction(witness=[sig, self.pubkey.bytes()], has_sig=True) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[b"", self.pubkey.bytes()]) - def __repr__(self): + def __repr__(self) -> str: return f"pk_h({self.pubkey})" - def pk_hash(self): + def pk_hash(self) -> bytes: assert isinstance(self.pubkey, DescriptorKey) return hash160(self.pubkey.bytes()) class Older(Node): - def __init__(self, value): + def __init__(self, value: int): assert value > 0 and value < 2 ** 31 - self.value = value + self.value: int = value self._script = [self.value, OP_CHECKSEQUENCEVERIFY] self.p = Property("Bz") @@ -300,23 +304,23 @@ def __init__(self, value): self.no_timelock_mix = True self.exec_info = ExecutionInfo(1, 0, 0, None) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: if sat_material.max_sequence < self.value: return Satisfaction.unavailable() return Satisfaction(witness=[]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() - def __repr__(self): + def __repr__(self) -> str: return f"older({self.value})" class After(Node): - def __init__(self, value): + def __init__(self, value: int): assert value > 0 and value < 2 ** 31 - self.value = value + self.value: int = value self._script = [self.value, OP_CHECKLOCKTIMEVERIFY] self.p = Property("Bz") @@ -331,15 +335,15 @@ def __init__(self, value): self.no_timelock_mix = True self.exec_info = ExecutionInfo(1, 0, 0, None) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: if sat_material.max_lock_time < self.value: return Satisfaction.unavailable() return Satisfaction(witness=[]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() - def __repr__(self): + def __repr__(self) -> str: return f"after({self.value})" @@ -349,10 +353,10 @@ class HashNode(Node): Should not be instanced directly, use concrete fragments instead. """ - def __init__(self, digest, hash_op): + def __init__(self, digest: bytes, hash_op: CScriptOp): assert isinstance(digest, bytes) # TODO: real errors - self.digest = digest + self.digest: bytes = digest self._script = [OP_SIZE, 32, OP_EQUALVERIFY, hash_op, digest, OP_EQUAL] self.p = Property("Bonud") @@ -367,60 +371,60 @@ def __init__(self, digest, hash_op): self.no_timelock_mix = True self.exec_info = ExecutionInfo(4, 0, 1, None) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: preimage = sat_material.preimages.get(self.digest) if preimage is None: return Satisfaction.unavailable() return Satisfaction(witness=[preimage]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() return Satisfaction(witness=[b""]) class Sha256(HashNode): - def __init__(self, digest): + def __init__(self, digest: bytes): assert len(digest) == 32 # TODO: real errors HashNode.__init__(self, digest, OP_SHA256) - def __repr__(self): + def __repr__(self) -> str: return f"sha256({self.digest.hex()})" class Hash256(HashNode): - def __init__(self, digest): + def __init__(self, digest: bytes): assert len(digest) == 32 # TODO: real errors HashNode.__init__(self, digest, OP_HASH256) - def __repr__(self): + def __repr__(self) -> str: return f"hash256({self.digest.hex()})" class Ripemd160(HashNode): - def __init__(self, digest): + def __init__(self, digest: bytes): assert len(digest) == 20 # TODO: real errors HashNode.__init__(self, digest, OP_RIPEMD160) - def __repr__(self): + def __repr__(self) -> str: return f"ripemd160({self.digest.hex()})" class Hash160(HashNode): - def __init__(self, digest): + def __init__(self, digest: bytes): assert len(digest) == 20 # TODO: real errors HashNode.__init__(self, digest, OP_HASH160) - def __repr__(self): + def __repr__(self) -> str: return f"hash160({self.digest.hex()})" class Multi(Node): - def __init__(self, k, keys): + def __init__(self, k: int, keys: typing.List[DescriptorKey]): assert 1 <= k <= len(keys) assert all(isinstance(k, DescriptorKey) for k in keys) - self.k = k - self.pubkeys = keys + self.k: int = k + self.pubkeys: typing.List[DescriptorKey] = keys self.p = Property("Bndu") self.needs_sig = True @@ -435,11 +439,11 @@ def __init__(self, k, keys): self.exec_info = ExecutionInfo(1, len(keys), 1 + k, 1 + k) @property - def keys(self): + def keys(self) -> typing.List[DescriptorKey]: return self.pubkeys @property - def _script(self): + def _script(self) -> typing.List[int, bytes, CScriptOp]: return [ self.k, *[k.bytes() for k in self.keys], @@ -447,7 +451,7 @@ def _script(self): OP_CHECKMULTISIG, ] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: sigs = [] for key in self.keys: sig = sat_material.signatures.get(key.bytes()) @@ -460,15 +464,15 @@ def satisfaction(self, sat_material): return Satisfaction.unavailable() return Satisfaction(witness=[b""] + sigs, has_sig=True) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[b""] * (self.k + 1)) - def __repr__(self): + def __repr__(self) -> str: return f"multi({','.join([str(self.k)] + [str(k) for k in self.keys])})" class AndV(Node): - def __init__(self, sub_x, sub_y): + def __init__(self, sub_x: Node, sub_y: Node): assert sub_x.p.V assert sub_y.p.has_any("BKV") @@ -499,21 +503,21 @@ def __init__(self, sub_x, sub_y): self.exec_info.set_undissatisfiable() # it's V. @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return sum((sub._script for sub in self.subs), start=[]) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_concat(sat_material, *self.subs) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() # it's V. - def __repr__(self): + def __repr__(self) -> str: return f"and_v({','.join(map(str, self.subs))})" class AndB(Node): - def __init__(self, sub_x, sub_y): + def __init__(self, sub_x: Node, sub_y: Node): assert sub_x.p.B and sub_y.p.W self.subs = [sub_x, sub_y] @@ -556,21 +560,21 @@ def __init__(self, sub_x, sub_y): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return sum((sub._script for sub in self.subs), start=[]) + [OP_BOOLAND] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_concat(sat_material, self.subs[0], self.subs[1]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - def __repr__(self): + def __repr__(self) -> str: return f"and_b({','.join(map(str, self.subs))})" class OrB(Node): - def __init__(self, sub_x, sub_z): + def __init__(self, sub_x: Node, sub_z: Node): assert sub_x.p.has_all("Bd") assert sub_z.p.has_all("Wd") @@ -597,23 +601,23 @@ def __init__(self, sub_x, sub_z): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return sum((sub._script for sub in self.subs), start=[]) + [OP_BOOLOR] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_concat( sat_material, self.subs[0], self.subs[1], disjunction=True ) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - def __repr__(self): + def __repr__(self) -> str: return f"or_b({','.join(map(str, self.subs))})" class OrC(Node): - def __init__(self, sub_x, sub_z): + def __init__(self, sub_x: Node, sub_z: Node): assert sub_x.p.has_all("Bdu") and sub_z.p.V self.subs = [sub_x, sub_z] @@ -642,21 +646,21 @@ def __init__(self, sub_x, sub_z): self.exec_info.set_undissatisfiable() # it's V. @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return self.subs[0]._script + [OP_NOTIF] + self.subs[1]._script + [OP_ENDIF] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_or_uneven(sat_material, self.subs[0], self.subs[1]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() # it's V. - def __repr__(self): + def __repr__(self) -> str: return f"or_c({','.join(map(str, self.subs))})" class OrD(Node): - def __init__(self, sub_x, sub_z): + def __init__(self, sub_x: Node, sub_z: Node): assert sub_x.p.has_all("Bdu") assert sub_z.p.has_all("B") @@ -687,7 +691,7 @@ def __init__(self, sub_x, sub_z): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return ( self.subs[0]._script + [OP_IFDUP, OP_NOTIF] @@ -695,18 +699,18 @@ def _script(self): + [OP_ENDIF] ) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_or_uneven(sat_material, self.subs[0], self.subs[1]) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - def __repr__(self): + def __repr__(self) -> str: return f"or_d({','.join(map(str, self.subs))})" class OrI(Node): - def __init__(self, sub_x, sub_z): + def __init__(self, sub_x: Node, sub_z: Node): assert sub_x.p.type() == sub_z.p.type() and sub_x.p.has_any("BKV") self.subs = [sub_x, sub_z] @@ -738,7 +742,7 @@ def __init__(self, sub_x, sub_z): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return ( [OP_IF] + self.subs[0]._script @@ -747,22 +751,22 @@ def _script(self): + [OP_ENDIF] ) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return (self.subs[0].satisfaction(sat_material) + Satisfaction([b"\x01"])) | ( self.subs[1].satisfaction(sat_material) + Satisfaction([b""]) ) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return (self.subs[0].dissatisfaction() + Satisfaction(witness=[b"\x01"])) | ( self.subs[1].dissatisfaction() + Satisfaction(witness=[b""]) ) - def __repr__(self): + def __repr__(self) -> str: return f"or_i({','.join(map(str, self.subs))})" class AndOr(Node): - def __init__(self, sub_x, sub_y, sub_z): + def __init__(self, sub_x: Node, sub_y: Node, sub_z: Node): assert sub_x.p.has_all("Bdu") assert sub_y.p.type() == sub_z.p.type() and sub_y.p.has_any("BKV") @@ -813,7 +817,7 @@ def __init__(self, sub_x, sub_y, sub_z): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return ( self.subs[0]._script + [OP_NOTIF] @@ -823,35 +827,35 @@ def _script(self): + [OP_ENDIF] ) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: # (A and B) or (!A and C) return ( self.subs[1].satisfaction(sat_material) + self.subs[0].satisfaction(sat_material) ) | (self.subs[2].satisfaction(sat_material) + self.subs[0].dissatisfaction()) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: # Dissatisfy X and Z return self.subs[2].dissatisfaction() + self.subs[0].dissatisfaction() - def __repr__(self): + def __repr__(self) -> str: return f"andor({','.join(map(str, self.subs))})" class AndN(AndOr): - def __init__(self, sub_x, sub_y): + def __init__(self, sub_x: Node, sub_y: Node): AndOr.__init__(self, sub_x, sub_y, Just0()) - def __repr__(self): + def __repr__(self) -> str: return f"and_n({self.subs[0]},{self.subs[1]})" class Thresh(Node): - def __init__(self, k, subs): + def __init__(self, k: int, subs: typing.List[Node]): n = len(subs) assert 1 <= k <= n - self.k = k + self.k: int = k self.subs = subs all_z = True @@ -908,22 +912,22 @@ def __init__(self, k, subs): self.exec_info = ExecutionInfo.from_thresh(k, [sub.exec_info for sub in subs]) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return ( self.subs[0]._script + sum(((sub._script + [OP_ADD]) for sub in self.subs[1:]), start=[]) + [self.k, OP_EQUAL] ) - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction.from_thresh(sat_material, self.k, self.subs) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return sum( [sub.dissatisfaction() for sub in self.subs], start=Satisfaction(witness=[]) ) - def __repr__(self): + def __repr__(self) -> str: return f"thresh({self.k},{','.join(map(str, self.subs))})" @@ -933,7 +937,7 @@ class WrapperNode(Node): Don't instanciate it directly, use concret wrapper fragments instead. """ - def __init__(self, sub): + def __init__(self, sub: Node): self.subs = [sub] # Properties for most wrappers are directly inherited. When it's not, they @@ -954,19 +958,19 @@ def __init__(self, sub): ) @property - def sub(self): + def sub(self) -> Node: # Wrapper have a single sub return self.subs[0] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: # Most wrappers are satisfied this way, for special cases it's overriden. return self.subs[0].satisfaction(sat_material) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: # Most wrappers are satisfied this way, for special cases it's overriden. return self.subs[0].dissatisfaction() - def skip_colon(self): + def skip_colon(self) -> bool: # We need to check this because of the pk() and pkh() aliases. if isinstance(self.subs[0], WrapC) and isinstance( self.subs[0].subs[0], (Pk, Pkh) @@ -976,7 +980,7 @@ def skip_colon(self): class WrapA(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.B WrapperNode.__init__(self, sub) @@ -984,10 +988,10 @@ def __init__(self, sub): self.exec_info = ExecutionInfo.from_wrap(sub.exec_info, ops_count=2) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return [OP_TOALTSTACK] + self.sub._script + [OP_FROMALTSTACK] - def __repr__(self): + def __repr__(self) -> str: # Don't duplicate colons if self.skip_colon(): return f"a{self.subs[0]}" @@ -995,7 +999,7 @@ def __repr__(self): class WrapS(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.has_all("Bo") WrapperNode.__init__(self, sub) @@ -1003,10 +1007,10 @@ def __init__(self, sub): self.exec_info = ExecutionInfo.from_wrap(sub.exec_info, ops_count=1) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return [OP_SWAP] + self.sub._script - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"s{self.subs[0]}" @@ -1014,7 +1018,7 @@ def __repr__(self): class WrapC(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.K WrapperNode.__init__(self, sub) @@ -1026,10 +1030,10 @@ def __init__(self, sub): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return self.sub._script + [OP_CHECKSIG] - def __repr__(self): + def __repr__(self) -> str: # Special case of aliases if isinstance(self.subs[0], Pk): return f"pk({self.subs[0].pubkey})" @@ -1042,13 +1046,13 @@ def __repr__(self): class WrapT(AndV, WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): AndV.__init__(self, sub, Just1()) - def is_wrapper(self): + def is_wrapper(self) -> True: return True - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"t{self.subs[0]}" @@ -1056,7 +1060,7 @@ def __repr__(self): class WrapD(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.has_all("Vz") WrapperNode.__init__(self, sub) @@ -1068,16 +1072,16 @@ def __init__(self, sub): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return [OP_DUP, OP_IF] + self.sub._script + [OP_ENDIF] - def satisfaction(self, sat_material): + def satisfaction(self, sat_material: SatisfactionMaterial) -> Satisfaction: return Satisfaction(witness=[b"\x01"]) + self.subs[0].satisfaction(sat_material) - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[b""]) - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"d{self.subs[0]}" @@ -1085,7 +1089,7 @@ def __repr__(self): class WrapV(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.B WrapperNode.__init__(self, sub) @@ -1096,7 +1100,7 @@ def __init__(self, sub): self.exec_info = ExecutionInfo.from_wrap(sub.exec_info, ops_count=verify_cost) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: if self.sub._script[-1] == OP_CHECKSIG: return self.sub._script[:-1] + [OP_CHECKSIGVERIFY] elif self.sub._script[-1] == OP_CHECKMULTISIG: @@ -1105,10 +1109,10 @@ def _script(self): return self.sub._script[:-1] + [OP_EQUALVERIFY] return self.sub._script + [OP_VERIFY] - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction.unavailable() # It's V. - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"v{self.subs[0]}" @@ -1116,7 +1120,7 @@ def __repr__(self): class WrapJ(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.has_all("Bn") WrapperNode.__init__(self, sub) @@ -1128,13 +1132,13 @@ def __init__(self, sub): ) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return [OP_SIZE, OP_0NOTEQUAL, OP_IF, *self.sub._script, OP_ENDIF] - def dissatisfaction(self): + def dissatisfaction(self) -> Satisfaction: return Satisfaction(witness=[b""]) - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"j{self.subs[0]}" @@ -1142,7 +1146,7 @@ def __repr__(self): class WrapN(WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): assert sub.p.B WrapperNode.__init__(self, sub) @@ -1150,10 +1154,10 @@ def __init__(self, sub): self.exec_info = ExecutionInfo.from_wrap(sub.exec_info, ops_count=1) @property - def _script(self): + def _script(self) -> typing.List[CScriptOp]: return [*self.sub._script, OP_0NOTEQUAL] - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"n{self.subs[0]}" @@ -1161,10 +1165,10 @@ def __repr__(self): class WrapL(OrI, WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): OrI.__init__(self, Just0(), sub) - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"l{self.subs[1]}" @@ -1172,10 +1176,10 @@ def __repr__(self): class WrapU(OrI, WrapperNode): - def __init__(self, sub): + def __init__(self, sub: Node): OrI.__init__(self, sub, Just0()) - def __repr__(self): + def __repr__(self) -> str: # Avoid duplicating colons if self.skip_colon(): return f"u{self.subs[0]}" diff --git a/bip380/miniscript/parsing.py b/bip380/miniscript/parsing.py index 5c9a36c..16445d8 100644 --- a/bip380/miniscript/parsing.py +++ b/bip380/miniscript/parsing.py @@ -2,10 +2,14 @@ Utilities to parse Miniscript from string and Script representations. """ -import bip380.miniscript.fragments as fragments +from __future__ import annotations + +import typing +import bip380.miniscript.fragments as fragments from bip380.key import DescriptorKey from bip380.utils.script import ( + CScript, CScriptOp, OP_ADD, OP_BOOLAND, @@ -39,7 +43,7 @@ ) -def stack_item_to_int(item): +def stack_item_to_int(item: typing.Union[bytes, fragments.Node, int]) -> typing.Optional[int]: """ Convert a stack item to an integer depending on its type. May raise an exception if the item is bytes, otherwise return None if it @@ -60,7 +64,7 @@ def stack_item_to_int(item): return None -def decompose_script(script): +def decompose_script(script: CScript) -> typing.List[CScriptOp]: """Create a list of Script element from a CScript, decomposing the compact -VERIFY opcodes into the non-VERIFY OP and an OP_VERIFY. """ @@ -77,7 +81,7 @@ def decompose_script(script): return elems -def parse_term_single_elem(expr_list, idx): +def parse_term_single_elem(expr_list: typing.List[CScriptOp], idx: int) -> None: """ Try to parse a terminal node from the element of {expr_list} at {idx}. """ @@ -96,7 +100,7 @@ def parse_term_single_elem(expr_list, idx): expr_list[idx] = fragments.Just0() -def parse_term_2_elems(expr_list, idx): +def parse_term_2_elems(expr_list: typing.List[CScriptOp], idx: int) -> typing.Optional[typing.List[CScriptOp]]: """ Try to parse a terminal node from two elements of {expr_list}, starting from {idx}. @@ -129,7 +133,11 @@ def parse_term_2_elems(expr_list, idx): return expr_list -def parse_term_5_elems(expr_list, idx, pkh_preimages={}): +def parse_term_5_elems( + expr_list: typing.List[CScriptOp], + idx: int, + pkh_preimages: typing.Dict[CScriptOp, typing.Union[bytes, str, DescriptorKey]] = {}, +) -> typing.Optional[typing.List[CScriptOp]]: """ Try to parse a terminal node from five elements of {expr_list}, starting from {idx}. @@ -153,7 +161,7 @@ def parse_term_5_elems(expr_list, idx, pkh_preimages={}): return expr_list -def parse_term_7_elems(expr_list, idx): +def parse_term_7_elems(expr_list: typing.List[CScriptOp], idx: int) -> typing.Optional[typing.List[CScriptOp]]: """ Try to parse a terminal node from seven elements of {expr_list}, starting from {idx}. @@ -206,7 +214,10 @@ def parse_term_7_elems(expr_list, idx): return expr_list -def parse_nonterm_2_elems(expr_list, idx): +def parse_nonterm_2_elems( + expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]], + idx: int, +) -> typing.Optional[typing.List[typing.Union[CScriptOp, fragments.Node]]]: """ Try to parse a non-terminal node from two elements of {expr_list}, starting from {idx}. @@ -251,7 +262,10 @@ def parse_nonterm_2_elems(expr_list, idx): return expr_list -def parse_nonterm_3_elems(expr_list, idx): +def parse_nonterm_3_elems( + expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]], + idx: int, +) -> typing.Optional[typing.List[typing.Union[CScriptOp, fragments.Node]]]: """ Try to parse a non-terminal node from *at least* three elements of {expr_list}, starting from {idx}. @@ -318,7 +332,10 @@ def parse_nonterm_3_elems(expr_list, idx): return expr_list -def parse_nonterm_4_elems(expr_list, idx): +def parse_nonterm_4_elems( + expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]], + idx: int, +) -> typing.Optional[typing.List[typing.Union[CScriptOp, fragments.Node]]]: """ Try to parse a non-terminal node from at least four elements of {expr_list}, starting from {idx}. @@ -376,7 +393,10 @@ def parse_nonterm_4_elems(expr_list, idx): return expr_list -def parse_nonterm_5_elems(expr_list, idx): +def parse_nonterm_5_elems( + expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]], + idx: int, +) -> typing.Optional[typing.List[typing.Union[CScriptOp, fragments.Node]]]: """ Try to parse a non-terminal node from five elements of {expr_list}, starting from {idx}. @@ -427,7 +447,10 @@ def parse_nonterm_5_elems(expr_list, idx): return expr_list -def parse_nonterm_6_elems(expr_list, idx): +def parse_nonterm_6_elems( + expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]], + idx: int, +) -> typing.Optional[typing.List[typing.Union[CScriptOp, fragments.Node]]]: """ Try to parse a non-terminal node from six elements of {expr_list}, starting from {idx}. @@ -455,7 +478,7 @@ def parse_nonterm_6_elems(expr_list, idx): return expr_list -def parse_expr_list(expr_list): +def parse_expr_list(expr_list: typing.List[typing.Union[CScriptOp, fragments.Node]]) -> fragments.Node: """Parse a node from a list of Script elements.""" # Every recursive call must progress the AST construction, # until it is complete (single root node remains). @@ -501,8 +524,11 @@ def parse_expr_list(expr_list): raise Exception("Malformed miniscript") -def miniscript_from_script(script, pkh_preimages={}): - """Construct miniscript node from script. +def miniscript_from_script( + script: CScript, + pkh_preimages: typing.Dict[CScriptOp, typing.Union[bytes, str, DescriptorKey]] = {}, +) -> fragments.Node: + """Construct miniscript node from script. # TODO: we return CScriptOp, not a Node? :param script: The Bitcoin Script to decode. :param pkh_preimage: A mapping from keyhash to key to decode pk_h() fragments. @@ -539,7 +565,7 @@ def miniscript_from_script(script, pkh_preimages={}): return parse_expr_list(expr_list) -def split_params(string): +def split_params(string: str) -> typing.Tuple[typing.List[str], str]: """Read a list of values before the next ')'. Split the result by comma.""" i = string.find(")") assert i >= 0 @@ -551,7 +577,7 @@ def split_params(string): return params.split(","), "" -def parse_many(string): +def parse_many(string: str) -> typing.Optional[typing.Tuple[typing.List[fragments.Node]], str]: """Read a list of nodes before the next ')'.""" subs = [] remaining = string @@ -564,7 +590,7 @@ def parse_many(string): remaining = remaining[1:] -def parse_one_num(string): +def parse_one_num(string: str) -> int: """Read an integer before the next comma.""" i = string.find(",") assert i >= 0 @@ -572,7 +598,7 @@ def parse_one_num(string): return int(string[:i]), string[i + 1 :] -def parse_one(string): +def parse_one(string: str) -> typing.Optional[typing.Tuple[fragments.Node, str]]: """Read a node and its subs recursively from a string. Returns the node and the part of the string not consumed. """ @@ -728,7 +754,7 @@ def parse_one(string): assert False, (tag, subs, remaining) # TODO -def miniscript_from_str(ms_str): +def miniscript_from_str(ms_str: str) -> fragments.Node: """Construct miniscript node from string representation""" node, remaining = parse_one(ms_str) assert remaining == "" diff --git a/bip380/miniscript/property.py b/bip380/miniscript/property.py index 9058bc3..93fd552 100644 --- a/bip380/miniscript/property.py +++ b/bip380/miniscript/property.py @@ -23,10 +23,10 @@ class Property: # "f": Forced property # "s": Safe property # "m": Nonmalleable property - types = "BVKW" - props = "zonduefsm" + types: str = "BVKW" + props: str = "zonduefsm" - def __init__(self, property_str=""): + def __init__(self, property_str: str = ""): """Create a property, optionally from a str of property and types""" for c in property_str: if c not in self.types + self.props: @@ -35,19 +35,19 @@ def __init__(self, property_str=""): for literal in self.types + self.props: setattr(self, literal, literal in property_str) - def __repr__(self): + def __repr__(self) -> str: """Generate string representation of property""" return "".join([c for c in self.types + self.props if getattr(self, c)]) - def has_all(self, properties): + def has_all(self, properties: str) -> bool: """Given a str of types and properties, return whether we have all of them""" return all([getattr(self, pt) for pt in properties]) - def has_any(self, properties): + def has_any(self, properties: str) -> bool: """Given a str of types and properties, return whether we have at least one of them""" return any([getattr(self, pt) for pt in properties]) - def check_valid(self): + def check_valid(self) -> None: """Raises a MiniscriptPropertyError if the types/properties conflict""" # Can only be of a single type. if len(self.type()) > 1: @@ -75,8 +75,8 @@ def check_valid(self): if conflicts: raise MiniscriptPropertyError(f"Conflicting types and properties: {', '.join(conflicts)}") - def type(self): + def type(self) -> str: return "".join(filter(lambda x: x in self.types, str(self))) - def properties(self): + def properties(self) -> str: return "".join(filter(lambda x: x in self.props, str(self))) diff --git a/bip380/miniscript/satisfaction.py b/bip380/miniscript/satisfaction.py index d3b8424..5110800 100644 --- a/bip380/miniscript/satisfaction.py +++ b/bip380/miniscript/satisfaction.py @@ -8,15 +8,24 @@ non-canonical (dis)satisfactions. """ +from __future__ import annotations -def add_optional(a, b): +import typing + +OptAny = typing.Optional[typing.Any] + +if typing.TYPE_CHECKING: + from bip380.miniscript.fragments import Node + + +def add_optional(a: OptAny, b: OptAny) -> OptAny: """Add two numbers that may be None together.""" if a is None or b is None: return None return a + b -def max_optional(a, b): +def max_optional(a: OptAny, b: OptAny) -> OptAny: """Return the maximum of two numbers that may be None.""" if a is None: return b @@ -29,7 +38,11 @@ class SatisfactionMaterial: """Data that may be needed in order to satisfy a Minsicript fragment.""" def __init__( - self, preimages={}, signatures={}, max_sequence=2 ** 32, max_lock_time=2 ** 32 + self, + preimages: typing.Dict[bytes, bytes] = {}, + signatures: typing.Dict[bytes, bytes] = {}, + max_sequence: int = 2 ** 32, + max_lock_time: int = 2 ** 32, ): """ :param preimages: Mapping from a hash (as bytes), to its 32-bytes preimage. @@ -37,18 +50,18 @@ def __init__( :param max_sequence: The maximum relative timelock possible (coin age). :param max_lock_time: The maximum absolute timelock possible (block height). """ - self.preimages = preimages - self.signatures = signatures - self.max_sequence = max_sequence - self.max_lock_time = max_lock_time + self.preimages: typing.Dict[bytes, bytes] = preimages + self.signatures: typing.Dict[bytes, bytes] = signatures + self.max_sequence: int = max_sequence + self.max_lock_time: int = max_lock_time - def clear(self): + def clear(self) -> None: self.preimages.clear() self.signatures.clear() self.max_sequence = 0 self.max_lock_time = 0 - def __repr__(self): + def __repr__(self) -> str: return ( f"SatisfactionMaterial(preimages: {self.preimages}, signatures: " f"{self.signatures}, max_sequence: {self.max_sequence}, max_lock_time: " @@ -59,22 +72,22 @@ def __repr__(self): class Satisfaction: """All information about a satisfaction.""" - def __init__(self, witness, has_sig=False): + def __init__(self, witness: typing.Optional[typing.List[bytes]], has_sig: bool = False): assert isinstance(witness, list) or witness is None - self.witness = witness - self.has_sig = has_sig + self.witness: typing.Optional[typing.List[bytes]] = witness + self.has_sig: bool = has_sig # TODO: we probably need to take into account non-canon sats, as the algorithm # described on the website mandates it: # > Iterate over all the valid satisfactions/dissatisfactions in the table above # > (including the non-canonical ones), - def __add__(self, other): + def __add__(self, other: Satisfaction) -> Satisfaction: """Concatenate two satisfactions together.""" witness = add_optional(self.witness, other.witness) has_sig = self.has_sig or other.has_sig return Satisfaction(witness, has_sig) - def __or__(self, other): + def __or__(self, other: Satisfaction) -> Satisfaction: """Choose between two (dis)satisfactions.""" assert isinstance(other, Satisfaction) @@ -109,16 +122,21 @@ def __or__(self, other): return self - def unavailable(): + def unavailable() -> Satisfaction: return Satisfaction(witness=None) - def is_unavailable(self): + def is_unavailable(self) -> bool: return self.witness is None - def size(self): + def size(self) -> int: return len(self.witness) + sum(len(elem) for elem in self.witness) - def from_concat(sat_material, sub_a, sub_b, disjunction=False): + def from_concat( + sat_material: SatisfactionMaterial, + sub_a: Node, + sub_b: Node, + disjunction: bool = False, + ) -> Satisfaction: """Get the satisfaction for a Miniscript whose Script corresponds to a concatenation of two subscripts A and B. @@ -132,7 +150,7 @@ def from_concat(sat_material, sub_a, sub_b, disjunction=False): ) return sub_b.satisfaction(sat_material) + sub_a.satisfaction(sat_material) - def from_or_uneven(sat_material, sub_a, sub_b): + def from_or_uneven(sat_material: SatisfactionMaterial, sub_a: Node, sub_b: Node) -> Satisfaction: """Get the satisfaction for a Miniscript which unconditionally executes a first sub A and only executes B if A was dissatisfied. @@ -143,7 +161,7 @@ def from_or_uneven(sat_material, sub_a, sub_b): sub_b.satisfaction(sat_material) + sub_a.dissatisfaction() ) - def from_thresh(sat_material, k, subs): + def from_thresh(sat_material: SatisfactionMaterial, k: int, subs: typing.List[Node]) -> Satisfaction: """Get the satisfaction for a Miniscript which satisfies k of the given subs, and dissatisfies all the others. @@ -189,24 +207,24 @@ def from_thresh(sat_material, k, subs): class ExecutionInfo: """Information about the execution of a Miniscript.""" - def __init__(self, stat_ops, _dyn_ops, sat_size, dissat_size): + def __init__(self, stat_ops: int, _dyn_ops: int, sat_size: int, dissat_size: int): # The *maximum* number of *always* executed non-PUSH Script OPs to satisfy this # Miniscript fragment non-malleably. - self._static_ops_count = stat_ops + self._static_ops_count: int = stat_ops # The maximum possible number of counted-as-executed-by-interpreter OPs if this # fragment is executed. # It is only >0 for an executed multi() branch. That is, for a CHECKMULTISIG that # is not part of an unexecuted branch of an IF .. ENDIF. - self._dyn_ops_count = _dyn_ops + self._dyn_ops_count: int = _dyn_ops # The *maximum* number of stack elements to satisfy this Miniscript fragment # non-malleably. - self.sat_elems = sat_size + self.sat_elems: int = sat_size # The *maximum* number of stack elements to dissatisfy this Miniscript fragment # non-malleably. - self.dissat_elems = dissat_size + self.dissat_elems: int = dissat_size @property - def ops_count(self): + def ops_count(self) -> int: """ The worst-case number of OPs that would be considered executed by the Script interpreter. @@ -214,15 +232,20 @@ def ops_count(self): """ return self._static_ops_count + self._dyn_ops_count - def is_dissatisfiable(self): + def is_dissatisfiable(self) -> bool: """Whether the Miniscript is *non-malleably* dissatisfiable.""" return self.dissat_elems is not None - def set_undissatisfiable(self): + def set_undissatisfiable(self) -> None: """Set the Miniscript as being impossible to dissatisfy.""" self.dissat_elems = None - def from_concat(sub_a, sub_b, ops_count=0, disjunction=False): + def from_concat( + sub_a: ExecutionInfo, + sub_b: ExecutionInfo, + ops_count: int = 0, + disjunction: bool = False, + ) -> ExecutionInfo: """Compute the execution info from a Miniscript whose Script corresponds to a concatenation of two subscript A and B. @@ -250,7 +273,7 @@ def from_concat(sub_a, sub_b, ops_count=0, disjunction=False): return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - def from_or_uneven(sub_a, sub_b, ops_count=0): + def from_or_uneven(sub_a: ExecutionInfo, sub_b: ExecutionInfo, ops_count: int = 0) -> ExecutionInfo: """Compute the execution info from a Miniscript which always executes A and only executes B depending on the outcome of A's execution. @@ -276,7 +299,7 @@ def from_or_uneven(sub_a, sub_b, ops_count=0): return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - def from_or_even(sub_a, sub_b, ops_count): + def from_or_even(sub_a: ExecutionInfo, sub_b: ExecutionInfo, ops_count: int) -> ExecutionInfo: """Compute the execution info from a Miniscript which executes either A or B, but never both. @@ -297,7 +320,12 @@ def from_or_even(sub_a, sub_b, ops_count): return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - def from_andor_uneven(sub_a, sub_b, sub_c, ops_count=0): + def from_andor_uneven( + sub_a: ExecutionInfo, + sub_b: ExecutionInfo, + sub_c: ExecutionInfo, + ops_count: int = 0, + ) -> ExecutionInfo: """Compute the execution info from a Miniscript which always executes A, and then executes B if A returned True else executes C. Semantic: or(and(A,B), C). @@ -331,7 +359,7 @@ def from_andor_uneven(sub_a, sub_b, sub_c, ops_count=0): return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) # TODO: i think it'd be possible to not have this be special-cased to 'thresh()' - def from_thresh(k, subs): + def from_thresh(k: int, subs: typing.List[ExecutionInfo]) -> ExecutionInfo: """Compute the execution info from a Miniscript 'thresh()' fragment. Specialized to this specifc fragment for now. @@ -375,7 +403,7 @@ def from_thresh(k, subs): return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - def from_wrap(sub, ops_count, dyn=0, sat=0, dissat=0): + def from_wrap(sub: ExecutionInfo, ops_count: int, dyn: int = 0, sat: int = 0, dissat: int = 0) -> ExecutionInfo: """Compute the execution info from a Miniscript which always executes a subscript but adds some logic around. @@ -392,7 +420,13 @@ def from_wrap(sub, ops_count, dyn=0, sat=0, dissat=0): add_optional(sub.dissat_elems, dissat), ) - def from_wrap_dissat(sub, ops_count, dyn=0, sat=0, dissat=0): + def from_wrap_dissat( + sub: ExecutionInfo, + ops_count: int, + dyn: int = 0, sat: + int = 0, + dissat: int = 0, + ) -> ExecutionInfo: """Compute the execution info from a Miniscript which always executes a subscript but adds some logic around. diff --git a/bip380/utils/bignum.py b/bip380/utils/bignum.py index 1384939..8dd1128 100644 --- a/bip380/utils/bignum.py +++ b/bip380/utils/bignum.py @@ -14,14 +14,14 @@ # generic big endian MPI format -def bn_bytes(v, have_ext=False): +def bn_bytes(v: int, have_ext: bool = False) -> int: ext = 0 if have_ext: ext = 1 return ((v.bit_length() + 7) // 8) + ext -def bn2bin(v): +def bn2bin(v: int) -> bytearray: s = bytearray() i = bn_bytes(v) while i > 0: @@ -30,7 +30,7 @@ def bn2bin(v): return s -def bn2mpi(v): +def bn2mpi(v: int) -> bytes: have_ext = False if v.bit_length() > 0: have_ext = (v.bit_length() & 0x07) == 0 @@ -54,11 +54,11 @@ def bn2mpi(v): # bitcoin-specific little endian format, with implicit size -def mpi2vch(s): +def mpi2vch(s: bytes) -> bytes: r = s[4:] # strip size r = r[::-1] # reverse string, converting BE->LE return r -def bn2vch(v): +def bn2vch(v: int) -> bytes: return bytes(mpi2vch(bn2mpi(v))) diff --git a/bip380/utils/hashes.py b/bip380/utils/hashes.py index 1234db6..c9dc1ee 100644 --- a/bip380/utils/hashes.py +++ b/bip380/utils/hashes.py @@ -5,13 +5,13 @@ import hashlib -def sha256(data): +def sha256(data: bytes) -> bytes: """{data} must be bytes, returns sha256(data)""" assert isinstance(data, bytes) return hashlib.sha256(data).digest() -def hash160(data): +def hash160(data: bytes) -> bytes: """{data} must be bytes, returns ripemd160(sha256(data))""" assert isinstance(data, bytes) return hashlib.new("ripemd160", sha256(data)).digest() diff --git a/bip380/utils/script.py b/bip380/utils/script.py index c53f89f..de0befb 100644 --- a/bip380/utils/script.py +++ b/bip380/utils/script.py @@ -7,7 +7,10 @@ This file was taken from Bitcoin Core test framework, and was previously modified from python-bitcoinlib. """ +from __future__ import annotations + import struct +import typing from .bignum import bn2vch @@ -21,7 +24,7 @@ class CScriptOp(int): __slots__ = () @staticmethod - def encode_op_pushdata(d): + def encode_op_pushdata(d: typing.Union[bytes, bytearray]) -> bytes: """Encode a PUSHDATA op, returning bytes""" if len(d) < 0x4C: return b"" + bytes([len(d)]) + d # OP_PUSHDATA @@ -35,7 +38,7 @@ def encode_op_pushdata(d): raise ValueError("Data too long to encode in a PUSHDATA op") @staticmethod - def encode_op_n(n): + def encode_op_n(n: int) -> CScriptOp: """Encode a small integer op, returning an opcode""" if not (0 <= n <= 16): raise ValueError("Integer must be in range 0 <= n <= 16, got %d" % n) @@ -45,7 +48,7 @@ def encode_op_n(n): else: return CScriptOp(OP_1 + n - 1) - def decode_op_n(self): + def decode_op_n(self) -> int: """Decode a small integer opcode, returning an integer""" if self == OP_0: return 0 @@ -55,17 +58,17 @@ def decode_op_n(self): return int(self - OP_1 + 1) - def is_small_int(self): + def is_small_int(self) -> bool: """Return true if the op pushes a small integer to the stack""" if 0x51 <= self <= 0x60 or self == 0: return True else: return False - def __str__(self): + def __str__(self) -> str: return repr(self) - def __repr__(self): + def __repr__(self) -> str: if self in OPCODE_NAMES: return OPCODE_NAMES[self] else: @@ -200,11 +203,11 @@ def __repr__(self): class ScriptNumError(ValueError): - def __init__(self, message): - self.message = message + def __init__(self, message: str): + self.message: str = message -def read_script_number(data): +def read_script_number(data: bytes) -> int: """Read a Script number from {data} bytes""" size = len(data) if size > 4: @@ -235,20 +238,19 @@ class CScriptInvalidError(Exception): class CScriptTruncatedPushDataError(CScriptInvalidError): """Invalid pushdata due to truncation""" - def __init__(self, msg, data): - self.data = data + def __init__(self, msg: str, data: bytes): + self.data: bytes = data super(CScriptTruncatedPushDataError, self).__init__(msg) - # This is used, eg, for blockchain heights in coinbase scripts (bip34) class CScriptNum: __slots__ = ("value",) - def __init__(self, d=0): - self.value = d + def __init__(self, d: int = 0): + self.value: int = d @staticmethod - def encode(obj): + def encode(obj: CScriptNum) -> bytes: r = bytearray(0) if obj.value == 0: return bytes(r) @@ -264,7 +266,7 @@ def encode(obj): return bytes([len(r)]) + r @staticmethod - def decode(vch): + def decode(vch: bytes) -> int: result = 0 # We assume valid push_size and minimal encoding value = vch[1:] @@ -294,7 +296,7 @@ class CScript(bytes): __slots__ = () @classmethod - def __coerce_instance(cls, other): + def __coerce_instance(cls, other: typing.Union[CScriptOp, CScriptNum, int, bytes, bytearray]) -> bytes: # Coerce other into bytes if isinstance(other, CScriptOp): other = bytes([other]) @@ -314,7 +316,7 @@ def __coerce_instance(cls, other): other = CScriptOp.encode_op_pushdata(other) return other - def __add__(self, other): + def __add__(self, other: typing.Union[CScriptOp, CScriptNum, int, bytes, bytearray]) -> "CScript": # Do the coercion outside of the try block so that errors in it are # noticed. other = self.__coerce_instance(other) @@ -325,16 +327,16 @@ def __add__(self, other): except TypeError: raise TypeError("Can not add a %r instance to a CScript" % other.__class__) - def join(self, iterable): + def join(self, iterable: typing.List[bytes]) -> bytes: # join makes no sense for a CScript() raise NotImplementedError - def __new__(cls, value=b""): - if isinstance(value, bytes) or isinstance(value, bytearray): + def __new__(cls, value: bytes = b""): + if isinstance(value, (bytes, bytearray)): return super(CScript, cls).__new__(cls, value) else: - def coerce_iterable(iterable): + def coerce_iterable(iterable: bytes) -> typing.Generator[bytes, typing.Any, None]: for instance in iterable: yield cls.__coerce_instance(instance) @@ -342,7 +344,7 @@ def coerce_iterable(iterable): # returns a bytes instance even when subclassed. return super(CScript, cls).__new__(cls, b"".join(coerce_iterable(value))) - def raw_iter(self): + def raw_iter(self) -> typing.Tuple[int, bytes, int]: """Raw iteration Yields tuples of (opcode, data, sop_idx) so that the different possible @@ -405,7 +407,7 @@ def raw_iter(self): yield (opcode, data, sop_idx) - def __iter__(self): + def __iter__(self) -> typing.Union[CScriptOp, int, bytes]: """'Cooked' iteration Returns either a CScriptOP instance, an integer, or bytes, as @@ -425,8 +427,8 @@ def __iter__(self): else: yield CScriptOp(opcode) - def __repr__(self): - def _repr(o): + def __repr__(self) -> str: + def _repr(o: typing.Union[bytes, CScriptOp]) -> str: if isinstance(o, bytes): return "x('%s')" % o.hex() else: @@ -452,7 +454,7 @@ def _repr(o): return "CScript([%s])" % ", ".join(ops) - def GetSigOpCount(self, fAccurate): + def GetSigOpCount(self, fAccurate: bool) -> int: """Get the SigOp count. fAccurate - Accurately count CHECKMULTISIG, see BIP16 for details.