diff --git a/ethereum/README.md b/ethereum/README.md deleted file mode 100644 index d5f2dab..0000000 --- a/ethereum/README.md +++ /dev/null @@ -1 +0,0 @@ -Code forked from diff --git a/ethereum/base_types.py b/ethereum/base_types.py deleted file mode 100644 index 0ad6f19..0000000 --- a/ethereum/base_types.py +++ /dev/null @@ -1,1011 +0,0 @@ -# pasted from Github: -# - -""" -Numeric & Array Types -^^^^^^^^^^^^^^^^^^^^^ - -.. contents:: Table of Contents - :backlinks: none - :local: - -Introduction ------------- - -Integer and array types which are used by—but not unique to—Ethereum. -""" - -from __future__ import annotations - -from dataclasses import replace -from typing import Any, Callable, ClassVar, Optional, Tuple, Type, TypeVar - -U8_MAX_VALUE = (2**8) - 1 -U32_MAX_VALUE = (2**32) - 1 -U32_CEIL_VALUE = 2**32 -U64_MAX_VALUE = (2**64) - 1 -U255_MAX_VALUE = (2**255) - 1 -U255_CEIL_VALUE = 2**255 -U256_MAX_VALUE = (2**256) - 1 -U256_CEIL_VALUE = 2**256 - - -class Uint(int): - """ - Unsigned positive integer. - """ - - __slots__ = () - - @classmethod - def from_be_bytes(cls: Type, buffer: "Bytes") -> "Uint": - """ - Converts a sequence of bytes into an arbitrarily sized unsigned integer - from its big endian representation. - Parameters - ---------- - buffer : - Bytes to decode. - Returns - ------- - self : `Uint` - Unsigned integer decoded from `buffer`. - """ - return cls(int.from_bytes(buffer, "big")) - - @classmethod - def from_le_bytes(cls: Type, buffer: "Bytes") -> "Uint": - """ - Convert a series of little endian bytes to an unsigned integer. - """ - return cls(int.from_bytes(buffer, "little")) - - def __init__(self, value: int) -> None: - if not isinstance(value, int): - raise TypeError() - - if value < 0: - raise ValueError() - - def __radd__(self, left: int) -> "Uint": - return self.__add__(left) - - def __add__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__add__(self, right)) - - def __iadd__(self, right: int) -> "Uint": - return self.__add__(right) - - def __sub__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0 or self < right: - raise ValueError() - - return int.__new__(self.__class__, int.__sub__(self, right)) - - def __rsub__(self, left: int) -> "Uint": - if not isinstance(left, int): - return NotImplemented - - if left < 0 or self > left: - raise ValueError() - - return int.__new__(self.__class__, int.__rsub__(self, left)) - - def __isub__(self, right: int) -> "Uint": - return self.__sub__(right) - - def __mul__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__mul__(self, right)) - - def __rmul__(self, left: int) -> "Uint": - return self.__mul__(left) - - def __imul__(self, right: int) -> "Uint": - return self.__mul__(right) - - # Explicitly don't override __truediv__, __rtruediv__, and __itruediv__ - # since they return floats anyway. - - def __floordiv__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__floordiv__(self, right)) - - def __rfloordiv__(self, left: int) -> "Uint": - if not isinstance(left, int): - return NotImplemented - - if left < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__rfloordiv__(self, left)) - - def __ifloordiv__(self, right: int) -> "Uint": - return self.__floordiv__(right) - - def __mod__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__mod__(self, right)) - - def __rmod__(self, left: int) -> "Uint": - if not isinstance(left, int): - return NotImplemented - - if left < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__rmod__(self, left)) - - def __imod__(self, right: int) -> "Uint": - return self.__mod__(right) - - def __divmod__(self, right: int) -> Tuple["Uint", "Uint"]: - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - result = int.__divmod__(self, right) - return ( - int.__new__(self.__class__, result[0]), - int.__new__(self.__class__, result[1]), - ) - - def __rdivmod__(self, left: int) -> Tuple["Uint", "Uint"]: - if not isinstance(left, int): - return NotImplemented - - if left < 0: - raise ValueError() - - result = int.__rdivmod__(self, left) - return ( - int.__new__(self.__class__, result[0]), - int.__new__(self.__class__, result[1]), - ) - - def __pow__(self, right: int, modulo: Optional[int] = None) -> "Uint": - if modulo is not None: - if not isinstance(modulo, int): - return NotImplemented - - if modulo < 0: - raise ValueError() - - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__pow__(self, right, modulo)) - - def __rpow__(self, left: int, modulo: Optional[int] = None) -> "Uint": - if modulo is not None: - if not isinstance(modulo, int): - return NotImplemented - - if modulo < 0: - raise ValueError() - - if not isinstance(left, int): - return NotImplemented - - if left < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__rpow__(self, left, modulo)) - - def __ipow__(self, right: int, modulo: Optional[int] = None) -> "Uint": - return self.__pow__(right, modulo) - - def __xor__(self, right: int) -> "Uint": - if not isinstance(right, int): - return NotImplemented - - if right < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__xor__(self, right)) - - def __rxor__(self, left: int) -> "Uint": - if not isinstance(left, int): - return NotImplemented - - if left < 0: - raise ValueError() - - return int.__new__(self.__class__, int.__rxor__(self, left)) - - def __ixor__(self, right: int) -> "Uint": - return self.__xor__(right) - - # TODO: Implement and, or, neg, pos, abs, invert, ... - - def to_be_bytes32(self) -> "Bytes32": - """ - Converts this arbitrarily sized unsigned integer into its big endian - representation with exactly 32 bytes. - Returns - ------- - big_endian : `Bytes32` - Big endian (most significant bits first) representation. - """ - return Bytes32(self.to_bytes(32, "big")) - - def to_be_bytes(self) -> "Bytes": - """ - Converts this arbitrarily sized unsigned integer into its big endian - representation. - Returns - ------- - big_endian : `Bytes` - Big endian (most significant bits first) representation. - """ - bit_length = self.bit_length() - byte_length = (bit_length + 7) // 8 - return self.to_bytes(byte_length, "big") - - def to_le_bytes(self, number_bytes: int = None) -> "Bytes": - """ - Converts this arbitrarily sized unsigned integer into its little endian - representation. - - Parameters - ---------- - number_bytes : - Exact number of bytes to return (defaults to the fewest that can - represent this number.) - - Returns - ------- - little_endian : `Bytes` - Little endian (most significant bits last) representation. - """ - if number_bytes is None: - bit_length = self.bit_length() - number_bytes = (bit_length + 7) // 8 - return self.to_bytes(number_bytes, "little") - - -T = TypeVar("T", bound="FixedUInt") - - -class FixedUInt(int): - """ - Superclass for fixed size unsigned integers. Not intended to be used - directly, but rather to be subclassed. - """ - - MAX_VALUE: ClassVar["FixedUInt"] - - __slots__ = () - - def __init__(self: T, value: int) -> None: - if not isinstance(value, int): - raise TypeError() - - if value < 0 or value > self.MAX_VALUE: - raise ValueError() - - def __radd__(self: T, left: int) -> T: - return self.__add__(left) - - def __add__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - result = int.__add__(self, right) - - if right < 0 or result > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, result) - - def wrapping_add(self: T, right: int) -> T: - """ - Return a new instance containing `self + right (mod N)`. - - Parameters - ---------- - - right : - Other operand for addition. - - Returns - ------- - - sum : T - The result of adding `self` and `right`, wrapped. - """ - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - # This is a fast way of ensuring that the result is < (2 ** 256) - return int.__new__( - self.__class__, int.__add__(self, right) & self.MAX_VALUE - ) - - def __iadd__(self: T, right: int) -> T: - return self.__add__(right) - - def __sub__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE or self < right: - raise ValueError() - - return int.__new__(self.__class__, int.__sub__(self, right)) - - def wrapping_sub(self: T, right: int) -> T: - """ - Return a new instance containing `self - right (mod N)`. - - Parameters - ---------- - - right : - Subtrahend operand for subtraction. - - Returns - ------- - - difference : T - The result of subtracting `right` from `self`, wrapped. - """ - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - # This is a fast way of ensuring that the result is < (2 ** 256) - return int.__new__( - self.__class__, int.__sub__(self, right) & self.MAX_VALUE - ) - - def __rsub__(self: T, left: int) -> T: - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE or self > left: - raise ValueError() - - return int.__new__(self.__class__, int.__rsub__(self, left)) - - def __isub__(self: T, right: int) -> T: - return self.__sub__(right) - - def __mul__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - result = int.__mul__(self, right) - - if right < 0 or result > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, result) - - def wrapping_mul(self: T, right: int) -> T: - """ - Return a new instance containing `self * right (mod N)`. - - Parameters - ---------- - - right : - Other operand for multiplication. - - Returns - ------- - - product : T - The result of multiplying `self` by `right`, wrapped. - """ - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - # This is a fast way of ensuring that the result is < (2 ** 256) - return int.__new__( - self.__class__, int.__mul__(self, right) & self.MAX_VALUE - ) - - def __rmul__(self: T, left: int) -> T: - return self.__mul__(left) - - def __imul__(self: T, right: int) -> T: - return self.__mul__(right) - - # Explicitly don't override __truediv__, __rtruediv__, and __itruediv__ - # since they return floats anyway. - - def __floordiv__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__floordiv__(self, right)) - - def __rfloordiv__(self: T, left: int) -> T: - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__rfloordiv__(self, left)) - - def __ifloordiv__(self: T, right: int) -> T: - return self.__floordiv__(right) - - def __mod__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__mod__(self, right)) - - def __rmod__(self: T, left: int) -> T: - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__rmod__(self, left)) - - def __imod__(self: T, right: int) -> T: - return self.__mod__(right) - - def __divmod__(self: T, right: int) -> Tuple[T, T]: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - result = super(FixedUInt, self).__divmod__(right) - return ( - int.__new__(self.__class__, result[0]), - int.__new__(self.__class__, result[1]), - ) - - def __rdivmod__(self: T, left: int) -> Tuple[T, T]: - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE: - raise ValueError() - - result = super(FixedUInt, self).__rdivmod__(left) - return ( - int.__new__(self.__class__, result[0]), - int.__new__(self.__class__, result[1]), - ) - - def __pow__(self: T, right: int, modulo: Optional[int] = None) -> T: - if modulo is not None: - if not isinstance(modulo, int): - return NotImplemented - - if modulo < 0 or modulo > self.MAX_VALUE: - raise ValueError() - - if not isinstance(right, int): - return NotImplemented - - result = int.__pow__(self, right, modulo) - - if right < 0 or right > self.MAX_VALUE or result > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, result) - - def wrapping_pow(self: T, right: int, modulo: Optional[int] = None) -> T: - """ - Return a new instance containing `self ** right (mod modulo)`. - - Parameters - ---------- - - right : - Exponent operand. - - modulo : - Optional modulus (defaults to `MAX_VALUE + 1`.) - - Returns - ------- - - power : T - The result of raising `self` to the power of `right`, wrapped. - """ - if modulo is not None: - if not isinstance(modulo, int): - return NotImplemented - - if modulo < 0 or modulo > self.MAX_VALUE: - raise ValueError() - - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - # This is a fast way of ensuring that the result is < (2 ** 256) - return int.__new__( - self.__class__, int.__pow__(self, right, modulo) & self.MAX_VALUE - ) - - def __rpow__(self: T, left: int, modulo: Optional[int] = None) -> T: - if modulo is not None: - if not isinstance(modulo, int): - return NotImplemented - - if modulo < 0 or modulo > self.MAX_VALUE: - raise ValueError() - - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__rpow__(self, left, modulo)) - - def __ipow__(self: T, right: int, modulo: Optional[int] = None) -> T: - return self.__pow__(right, modulo) - - def __and__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__and__(self, right)) - - def __or__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__or__(self, right)) - - def __xor__(self: T, right: int) -> T: - if not isinstance(right, int): - return NotImplemented - - if right < 0 or right > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__xor__(self, right)) - - def __rxor__(self: T, left: int) -> T: - if not isinstance(left, int): - return NotImplemented - - if left < 0 or left > self.MAX_VALUE: - raise ValueError() - - return int.__new__(self.__class__, int.__rxor__(self, left)) - - def __ixor__(self: T, right: int) -> T: - return self.__xor__(right) - - def __invert__(self: T) -> T: - return int.__new__( - self.__class__, int.__invert__(self) & self.MAX_VALUE - ) - - def __rshift__(self: T, shift_by: int) -> T: - if not isinstance(shift_by, int): - return NotImplemented - return int.__new__(self.__class__, int.__rshift__(self, shift_by)) - - def to_be_bytes(self) -> "Bytes": - """ - Converts this unsigned integer into its big endian representation, - omitting leading zero bytes. - - Returns - ------- - big_endian : `Bytes` - Big endian (most significant bits first) representation. - """ - bit_length = self.bit_length() - byte_length = (bit_length + 7) // 8 - return self.to_bytes(byte_length, "big") - - # TODO: Implement neg, pos, abs ... - - -class U256(FixedUInt): - """ - Unsigned positive integer, which can represent `0` to `2 ** 256 - 1`, - inclusive. - """ - - MAX_VALUE: ClassVar["U256"] - - __slots__ = () - - @classmethod - def from_be_bytes(cls: Type, buffer: "Bytes") -> "U256": - """ - Converts a sequence of bytes into an arbitrarily sized unsigned integer - from its big endian representation. - Parameters - ---------- - buffer : - Bytes to decode. - Returns - ------- - self : `U256` - Unsigned integer decoded from `buffer`. - """ - if len(buffer) > 32: - raise ValueError() - - return cls(int.from_bytes(buffer, "big")) - - @classmethod - def from_signed(cls: Type, value: int) -> "U256": - """ - Converts a signed number into a 256-bit unsigned integer. - Parameters - ---------- - value : - Signed number - Returns - ------- - self : `U256` - Unsigned integer obtained from `value`. - """ - if value >= 0: - return cls(value) - - return cls(value & cls.MAX_VALUE) - - def to_be_bytes32(self) -> "Bytes32": - """ - Converts this 256-bit unsigned integer into its big endian - representation with exactly 32 bytes. - Returns - ------- - big_endian : `Bytes32` - Big endian (most significant bits first) representation. - """ - return Bytes32(self.to_bytes(32, "big")) - - def to_signed(self) -> int: - """ - Converts this 256-bit unsigned integer into a signed integer. - Returns - ------- - signed_int : `int` - Signed integer obtained from 256-bit unsigned integer. - """ - if self <= U255_MAX_VALUE: - # This means that the sign bit is 0 - return int(self) - - # -1 * (2's complement of U256 value) - return int(self) - U256_CEIL_VALUE - - -U256.MAX_VALUE = int.__new__(U256, U256_MAX_VALUE) -"""autoapi_noindex""" - - -class U32(FixedUInt): - """ - Unsigned positive integer, which can represent `0` to `2 ** 32 - 1`, - inclusive. - """ - - MAX_VALUE: ClassVar["U32"] - - __slots__ = () - - @classmethod - def from_le_bytes(cls: Type, buffer: "Bytes") -> "U32": - """ - Converts a sequence of bytes into an arbitrarily sized unsigned integer - from its little endian representation. - """ - if len(buffer) > 4: - raise ValueError() - - return cls(int.from_bytes(buffer, "little")) - - def to_le_bytes4(self) -> "Bytes4": - """ - Converts this fixed sized unsigned integer into its little endian - representation, with exactly 4 bytes. - - Returns - ------- - little_endian : `Bytes4` - Little endian (most significant bits last) representation. - """ - return Bytes4(self.to_bytes(4, "little")) - - def to_le_bytes(self) -> "Bytes": - """ - Converts this fixed sized unsigned integer into its little endian - representation, in the fewest bytes possible. - - Returns - ------- - little_endian : `Bytes` - Little endian (most significant bits last) representation. - """ - bit_length = self.bit_length() - byte_length = (bit_length + 7) // 8 - return self.to_bytes(byte_length, "little") - - -U32.MAX_VALUE = int.__new__(U32, U32_MAX_VALUE) -"""autoapi_noindex""" - - -class U64(FixedUInt): - """ - Unsigned positive integer, which can represent `0` to `2 ** 64 - 1`, - inclusive. - """ - - MAX_VALUE: ClassVar["U64"] - - __slots__ = () - - @classmethod - def from_le_bytes(cls: Type, buffer: "Bytes") -> "U64": - """ - Converts a sequence of bytes into an arbitrarily sized unsigned integer - from its little endian representation. - """ - if len(buffer) > 8: - raise ValueError() - - return cls(int.from_bytes(buffer, "little")) - - def to_le_bytes8(self) -> "Bytes8": - """ - Converts this fixed sized unsigned integer into its little endian - representation, with exactly 8 bytes. - - Returns - ------- - little_endian : `Bytes8` - Little endian (most significant bits last) representation. - """ - return Bytes8(self.to_bytes(8, "little")) - - def to_le_bytes(self) -> "Bytes": - """ - Converts this fixed sized unsigned integer into its little endian - representation, in the fewest bytes possible. - - Returns - ------- - little_endian : `Bytes` - Little endian (most significant bits last) representation. - """ - bit_length = self.bit_length() - byte_length = (bit_length + 7) // 8 - return self.to_bytes(byte_length, "little") - - @classmethod - def from_be_bytes(cls: Type, buffer: "Bytes") -> "U64": - """ - Converts a sequence of bytes into an unsigned 64 bit integer from its - big endian representation. - - Parameters - ---------- - buffer : - Bytes to decode. - Returns - ------- - self : `U64` - Unsigned integer decoded from `buffer`. - """ - if len(buffer) > 8: - raise ValueError() - - return cls(int.from_bytes(buffer, "big")) - - -U64.MAX_VALUE = int.__new__(U64, U64_MAX_VALUE) -"""autoapi_noindex""" - - -B = TypeVar("B", bound="FixedBytes") - - -class FixedBytes(bytes): - """ - Superclass for fixed sized byte arrays. Not intended to be used directly, - but should be subclassed. - """ - - LENGTH: int - - __slots__ = () - - def __new__(cls: Type[B], *args: Any, **kwargs: Any) -> B: - """ - Create a new instance, ensuring the result has the correct length. - """ - result = super(FixedBytes, cls).__new__(cls, *args, **kwargs) - if len(result) != cls.LENGTH: - raise ValueError( - f"expected {cls.LENGTH} bytes but got {len(result)}" - ) - return result - - -class Bytes0(FixedBytes): - """ - Byte array of exactly zero elements. - """ - - LENGTH = 0 - - -class Bytes4(FixedBytes): - """ - Byte array of exactly four elements. - """ - - LENGTH = 4 - - -class Bytes8(FixedBytes): - """ - Byte array of exactly eight elements. - """ - - LENGTH = 8 - - -class Bytes20(FixedBytes): - """ - Byte array of exactly 20 elements. - """ - - LENGTH = 20 - - -class Bytes32(FixedBytes): - """ - Byte array of exactly 32 elements. - """ - - LENGTH = 32 - - -class Bytes64(FixedBytes): - """ - Byte array of exactly 64 elements. - """ - - LENGTH = 64 - - -class Bytes256(FixedBytes): - """ - Byte array of exactly 256 elements. - """ - - LENGTH = 256 - - -Bytes = bytes - - -def _setattr_function(self: Any, attr: str, value: Any) -> None: - if getattr(self, "_frozen", None): - raise Exception("Mutating frozen dataclasses is not allowed.") - else: - object.__setattr__(self, attr, value) - - -def _delattr_function(self: Any, attr: str) -> None: - if self._frozen: - raise Exception("Mutating frozen dataclasses is not allowed.") - else: - object.__delattr__(self, attr) - - -def _make_init_function(f: Callable) -> Callable: - def init_function(self: Any, *args: Any, **kwargs: Any) -> None: - will_be_frozen = kwargs.pop("_frozen", True) - object.__setattr__(self, "_frozen", False) - f(self, *args, **kwargs) - self._frozen = will_be_frozen - - return init_function - - -def slotted_freezable(cls: Any) -> Any: - """ - Monkey patches a dataclass so it can be frozen by setting `_frozen` to - `True` and uses `__slots__` for efficiency. - - Instances will be created frozen by default unless you pass `_frozen=False` - to `__init__`. - """ - cls.__slots__ = ("_frozen",) + tuple(cls.__annotations__) - cls.__init__ = _make_init_function(cls.__init__) - cls.__setattr__ = _setattr_function - cls.__delattr__ = _delattr_function - return type(cls)(cls.__name__, cls.__bases__, dict(cls.__dict__)) - - -S = TypeVar("S") - - -def modify(obj: S, f: Callable[[S], None]) -> S: - """ - Create a mutable copy of `obj` (which must be `@slotted_freezable`) and - apply `f` to the copy before freezing it. - - Parameters - ---------- - obj : `S` - Object to copy. - f : `Callable[[S], None]` - Function to apply to `obj`. - - Returns - ------- - new_obj : `S` - Compact byte array. - """ - new_obj = replace(obj, _frozen=False) - f(new_obj) - new_obj._frozen = True # type: ignore - return new_obj \ No newline at end of file diff --git a/ethereum/crypto_hash.py b/ethereum/crypto_hash.py deleted file mode 100644 index 67b45a8..0000000 --- a/ethereum/crypto_hash.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Cryptographic Hash Functions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. contents:: Table of Contents - :backlinks: none - :local: - -Introduction ------------- - -Cryptographic hashing functions. -""" - -from Crypto.Hash import keccak - -from .base_types import Bytes, Bytes32, Bytes64 - -Hash32 = Bytes32 -Hash64 = Bytes64 - - -def keccak256(buffer: Bytes) -> Hash32: - """ - Computes the keccak256 hash of the input `buffer`. - - Parameters - ---------- - buffer : - Input for the hashing function. - - Returns - ------- - hash : `ethereum.base_types.Hash32` - Output of the hash function. - """ - k = keccak.new(digest_bits=256) - return Hash32(k.update(buffer).digest()) - - -def keccak512(buffer: Bytes) -> Hash64: - """ - Computes the keccak512 hash of the input `buffer`. - - Parameters - ---------- - buffer : - Input for the hashing function. - - Returns - ------- - hash : `ethereum.base_types.Hash32` - Output of the hash function. - """ - k = keccak.new(digest_bits=512) - return Hash64(k.update(buffer).digest()) \ No newline at end of file diff --git a/ethereum/exceptions.py b/ethereum/exceptions.py deleted file mode 100644 index 5f9fdcf..0000000 --- a/ethereum/exceptions.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -Exceptions -^^^^^^^^^^ - -.. contents:: Table of Contents - :backlinks: none - :local: - -Introduction ------------- - -The Ethereum specification exception classes. -""" - - -class EthereumException(Exception): - """ - The base class from which all exceptions thrown by the specification during - normal operation derive. - """ - - -class InvalidBlock(EthereumException): - """ - Thrown when a block being processed is found to be invalid. - """ - - -class RLPDecodingError(InvalidBlock): - """ - Indicates that RLP decoding failed. - """ - - -class RLPEncodingError(EthereumException): - """ - Indicates that RLP encoding failed. - """ \ No newline at end of file diff --git a/ethereum/rlp.py b/ethereum/rlp.py deleted file mode 100644 index dadf93c..0000000 --- a/ethereum/rlp.py +++ /dev/null @@ -1,512 +0,0 @@ -""" -.. _rlp: - -Recursive Length Prefix (RLP) Encoding -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. contents:: Table of Contents - :backlinks: none - :local: - -Introduction ------------- - -Defines the serialization and deserialization format used throughout Ethereum. -""" - -from __future__ import annotations - -from dataclasses import astuple, fields, is_dataclass -from typing import Any, List, Sequence, Tuple, Type, TypeVar, Union - -from .crypto_hash import Hash32, keccak256 -from .exceptions import RLPDecodingError, RLPEncodingError -from .utils_ensure import ensure - -from fiber.ethereum.base_types import Bytes, Bytes0, Bytes20, FixedBytes, FixedUInt, Uint - -RLP = Any - - -# -# RLP Encode -# - - -def encode(raw_data: RLP) -> Bytes: - """ - Encodes `raw_data` into a sequence of bytes using RLP. - - Parameters - ---------- - raw_data : - A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable - objects. - - Returns - ------- - encoded : `ethereum.base_types.Bytes` - The RLP encoded bytes representing `raw_data`. - """ - if isinstance(raw_data, (bytearray, bytes)): - return encode_bytes(raw_data) - elif isinstance(raw_data, (Uint, FixedUInt)): - return encode(raw_data.to_be_bytes()) - elif isinstance(raw_data, str): - return encode_bytes(raw_data.encode()) - elif isinstance(raw_data, bool): - if raw_data: - return encode_bytes(b"\x01") - else: - return encode_bytes(b"") - elif isinstance(raw_data, Sequence): - return encode_sequence(raw_data) - elif is_dataclass(raw_data): - return encode(astuple(raw_data)) - else: - raise RLPEncodingError( - "RLP Encoding of type {} is not supported".format(type(raw_data)) - ) - - -def encode_bytes(raw_bytes: Bytes) -> Bytes: - """ - Encodes `raw_bytes`, a sequence of bytes, using RLP. - - Parameters - ---------- - raw_bytes : - Bytes to encode with RLP. - - Returns - ------- - encoded : `ethereum.base_types.Bytes` - The RLP encoded bytes representing `raw_bytes`. - """ - len_raw_data = Uint(len(raw_bytes)) - - if len_raw_data == 1 and raw_bytes[0] < 0x80: - return raw_bytes - elif len_raw_data < 0x38: - return bytes([0x80 + len_raw_data]) + raw_bytes - else: - # length of raw data represented as big endian bytes - len_raw_data_as_be = len_raw_data.to_be_bytes() - return ( - bytes([0xB7 + len(len_raw_data_as_be)]) - + len_raw_data_as_be - + raw_bytes - ) - - -def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes: - """ - Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. - - Parameters - ---------- - raw_sequence : - Sequence of RLP encodable objects. - - Returns - ------- - encoded : `ethereum.base_types.Bytes` - The RLP encoded bytes representing `raw_sequence`. - """ - joined_encodings = get_joined_encodings(raw_sequence) - len_joined_encodings = Uint(len(joined_encodings)) - - if len_joined_encodings < 0x38: - return Bytes([0xC0 + len_joined_encodings]) + joined_encodings - else: - len_joined_encodings_as_be = len_joined_encodings.to_be_bytes() - return ( - Bytes([0xF7 + len(len_joined_encodings_as_be)]) - + len_joined_encodings_as_be - + joined_encodings - ) - - -def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes: - """ - Obtain concatenation of rlp encoding for each item in the sequence - raw_sequence. - - Parameters - ---------- - raw_sequence : - Sequence to encode with RLP. - - Returns - ------- - joined_encodings : `ethereum.base_types.Bytes` - The concatenated RLP encoded bytes for each item in sequence - raw_sequence. - """ - return b"".join(encode(item) for item in raw_sequence) - - -# -# RLP Decode -# - - -def decode(encoded_data: Bytes) -> RLP: - """ - Decodes an integer, byte sequence, or list of RLP encodable objects - from the byte sequence `encoded_data`, using RLP. - - Parameters - ---------- - encoded_data : - A sequence of bytes, in RLP form. - - Returns - ------- - decoded_data : `RLP` - Object decoded from `encoded_data`. - """ - # Raising error as there can never be empty encoded data for any - # given raw data (including empty raw data) - # RLP Encoding(b'') -> [0x80] # noqa: SC100 - # RLP Encoding([]) -> [0xc0] # noqa: SC100 - ensure( - len(encoded_data) > 0, - RLPDecodingError("Cannot decode empty bytestring"), - ) - - if encoded_data[0] <= 0xBF: - # This means that the raw data is of type bytes - return decode_to_bytes(encoded_data) - else: - # This means that the raw data is of type sequence - return decode_to_sequence(encoded_data) - - -T = TypeVar("T") - - -def decode_to(cls: Type[T], encoded_data: Bytes) -> T: - """ - Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be - a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. - - Parameters - ---------- - cls: `Type[T]` - The type to decode to. - encoded_data : - A sequence of bytes, in RLP form. - - Returns - ------- - decoded_data : `T` - Object decoded from `encoded_data`. - """ - return _decode_to(cls, decode(encoded_data)) - - -def _decode_to(cls: Type[T], raw_rlp: RLP) -> T: - """ - Decode the rlp structure in `encoded_data` to an object of type `cls`. - `cls` can be a `Bytes` subclass, a dataclass, `Uint`, `U256`, - `Tuple[cls, ...]`, `Tuple[cls1, cls2]` or `Union[Bytes, cls]`. - - Parameters - ---------- - cls: `Type[T]` - The type to decode to. - raw_rlp : - A decoded rlp structure. - - Returns - ------- - decoded_data : `T` - Object decoded from `encoded_data`. - """ - if isinstance(cls, type(Tuple[Uint, ...])) and cls._name == "Tuple": # type: ignore # noqa: E501 - ensure(type(raw_rlp) == list, RLPDecodingError) - if cls.__args__[1] == ...: # type: ignore - args = [] - for raw_item in raw_rlp: - args.append(_decode_to(cls.__args__[0], raw_item)) # type: ignore # noqa: E501 - return tuple(args) # type: ignore - else: - args = [] - ensure(len(raw_rlp) == len(cls.__args__), RLPDecodingError) # type: ignore # noqa: E501 - for (t, raw_item) in zip(cls.__args__, raw_rlp): # type: ignore - args.append(_decode_to(t, raw_item)) - return tuple(args) # type: ignore - elif cls == Union[Bytes0, Bytes20]: - # We can't support Union types in general, so we support this one - # (which appears in the Transaction type) as a special case - ensure(type(raw_rlp) == Bytes, RLPDecodingError) - if len(raw_rlp) == 0: - return Bytes0() # type: ignore - elif len(raw_rlp) == 20: - return Bytes20(raw_rlp) # type: ignore - else: - raise RLPDecodingError( - "Bytes has length {}, expected 0 or 20".format(len(raw_rlp)) - ) - elif isinstance(cls, type(List[Bytes])) and cls._name == "List": # type: ignore # noqa: E501 - ensure(type(raw_rlp) == list, RLPDecodingError) - items = [] - for raw_item in raw_rlp: - items.append(_decode_to(cls.__args__[0], raw_item)) # type: ignore - return items # type: ignore - elif isinstance(cls, type(Union[Bytes, List[Bytes]])) and cls.__origin__ == Union: # type: ignore # noqa: E501 - if len(cls.__args__) != 2 or Bytes not in cls.__args__: # type: ignore - raise RLPDecodingError( - "RLP Decoding to type {} is not supported".format(cls) - ) - if isinstance(raw_rlp, Bytes): - return raw_rlp # type: ignore - elif cls.__args__[0] == Bytes: # type: ignore - return _decode_to(cls.__args__[1], raw_rlp) # type: ignore - else: - return _decode_to(cls.__args__[0], raw_rlp) # type: ignore - elif issubclass(cls, bool): - if raw_rlp == b"\x01": - return cls(True) # type: ignore - elif raw_rlp == b"": - return cls(False) # type: ignore - else: - raise TypeError("Cannot decode {} as {}".format(raw_rlp, cls)) - elif issubclass(cls, FixedBytes): - ensure(type(raw_rlp) == Bytes, RLPDecodingError) - ensure(len(raw_rlp) == cls.LENGTH, RLPDecodingError) - return raw_rlp - elif issubclass(cls, Bytes): - ensure(type(raw_rlp) == Bytes, RLPDecodingError) - return raw_rlp - elif issubclass(cls, (Uint, FixedUInt)): - ensure(type(raw_rlp) == Bytes, RLPDecodingError) - try: - return cls.from_be_bytes(raw_rlp) # type: ignore - except ValueError: - raise RLPDecodingError - elif is_dataclass(cls): - ensure(type(raw_rlp) == list, RLPDecodingError) - assert isinstance(raw_rlp, list) - args = [] - ensure(len(fields(cls)) == len(raw_rlp), RLPDecodingError) - for (field, rlp_item) in zip(fields(cls), raw_rlp): - args.append(_decode_to(field.type, rlp_item)) - return cls(*args) - else: - raise RLPDecodingError( - "RLP Decoding to type {} is not supported".format(cls) - ) - - -def decode_to_bytes(encoded_bytes: Bytes) -> Bytes: - """ - Decodes a rlp encoded byte stream assuming that the decoded data - should be of type `bytes`. - - Parameters - ---------- - encoded_bytes : - RLP encoded byte stream. - - Returns - ------- - decoded : `ethereum.base_types.Bytes` - RLP decoded Bytes data - """ - if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: - return encoded_bytes - elif encoded_bytes[0] <= 0xB7: - len_raw_data = encoded_bytes[0] - 0x80 - ensure(len_raw_data < len(encoded_bytes), RLPDecodingError) - raw_data = encoded_bytes[1 : 1 + len_raw_data] - ensure( - not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError - ) - return raw_data - else: - # This is the index in the encoded data at which decoded data - # starts from. - decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 - ensure( - decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError - ) - # Expectation is that the big endian bytes shouldn't start with 0 - # while trying to decode using RLP, in which case is an error. - ensure(encoded_bytes[1] != 0, RLPDecodingError) - len_decoded_data = Uint.from_be_bytes( - encoded_bytes[1:decoded_data_start_idx] - ) - ensure(len_decoded_data >= 0x38, RLPDecodingError) - decoded_data_end_idx = decoded_data_start_idx + len_decoded_data - ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError) - return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] - - -def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]: - """ - Decodes a rlp encoded byte stream assuming that the decoded data - should be of type `Sequence` of objects. - - Parameters - ---------- - encoded_sequence : - An RLP encoded Sequence. - - Returns - ------- - decoded : `Sequence[RLP]` - Sequence of objects decoded from `encoded_sequence`. - """ - if encoded_sequence[0] <= 0xF7: - len_joined_encodings = encoded_sequence[0] - 0xC0 - ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError) - joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] - else: - joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 - ensure( - joined_encodings_start_idx - 1 < len(encoded_sequence), - RLPDecodingError, - ) - # Expectation is that the big endian bytes shouldn't start with 0 - # while trying to decode using RLP, in which case is an error. - ensure(encoded_sequence[1] != 0, RLPDecodingError) - len_joined_encodings = Uint.from_be_bytes( - encoded_sequence[1:joined_encodings_start_idx] - ) - ensure(len_joined_encodings >= 0x38, RLPDecodingError) - joined_encodings_end_idx = ( - joined_encodings_start_idx + len_joined_encodings - ) - ensure( - joined_encodings_end_idx - 1 < len(encoded_sequence), - RLPDecodingError, - ) - joined_encodings = encoded_sequence[ - joined_encodings_start_idx:joined_encodings_end_idx - ] - - return decode_joined_encodings(joined_encodings) - - -def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]: - """ - Decodes `joined_encodings`, which is a concatenation of RLP encoded - objects. - - Parameters - ---------- - joined_encodings : - concatenation of RLP encoded objects - - Returns - ------- - decoded : `List[RLP]` - A list of objects decoded from `joined_encodings`. - """ - decoded_sequence = [] - - item_start_idx = 0 - while item_start_idx < len(joined_encodings): - encoded_item_length = decode_item_length( - joined_encodings[item_start_idx:] - ) - ensure( - item_start_idx + encoded_item_length - 1 < len(joined_encodings), - RLPDecodingError, - ) - encoded_item = joined_encodings[ - item_start_idx : item_start_idx + encoded_item_length - ] - decoded_sequence.append(decode(encoded_item)) - item_start_idx += encoded_item_length - - return decoded_sequence - - -def decode_item_length(encoded_data: Bytes) -> int: - """ - Find the length of the rlp encoding for the first object in the - encoded sequence. - Here `encoded_data` refers to concatenation of rlp encoding for each - item in a sequence. - - NOTE - This is a helper function not described in the spec. It was - introduced as the spec doesn't discuss about decoding the RLP encoded - data. - - Parameters - ---------- - encoded_data : - RLP encoded data for a sequence of objects. - - Returns - ------- - rlp_length : `int` - """ - # Can't decode item length for empty encoding - ensure(len(encoded_data) > 0, RLPDecodingError) - - first_rlp_byte = Uint(encoded_data[0]) - - # This is the length of the big endian representation of the length of - # rlp encoded object byte stream. - length_length = Uint(0) - decoded_data_length = 0 - - # This occurs only when the raw_data is a single byte whose value < 128 - if first_rlp_byte < 0x80: - # We return 1 here, as the end formula - # 1 + length_length + decoded_data_length would be invalid for - # this case. - return 1 - # This occurs only when the raw_data is a byte stream with length < 56 - # and doesn't fall into the above cases - elif first_rlp_byte <= 0xB7: - decoded_data_length = first_rlp_byte - 0x80 - # This occurs only when the raw_data is a byte stream and doesn't fall - # into the above cases - elif first_rlp_byte <= 0xBF: - length_length = first_rlp_byte - 0xB7 - ensure(length_length < len(encoded_data), RLPDecodingError) - # Expectation is that the big endian bytes shouldn't start with 0 - # while trying to decode using RLP, in which case is an error. - ensure(encoded_data[1] != 0, RLPDecodingError) - decoded_data_length = Uint.from_be_bytes( - encoded_data[1 : 1 + length_length] - ) - # This occurs only when the raw_data is a sequence of objects with - # length(concatenation of encoding of each object) < 56 - elif first_rlp_byte <= 0xF7: - decoded_data_length = first_rlp_byte - 0xC0 - # This occurs only when the raw_data is a sequence of objects and - # doesn't fall into the above cases. - elif first_rlp_byte <= 0xFF: - length_length = first_rlp_byte - 0xF7 - ensure(length_length < len(encoded_data), RLPDecodingError) - # Expectation is that the big endian bytes shouldn't start with 0 - # while trying to decode using RLP, in which case is an error. - ensure(encoded_data[1] != 0, RLPDecodingError) - decoded_data_length = Uint.from_be_bytes( - encoded_data[1 : 1 + length_length] - ) - - return 1 + length_length + decoded_data_length - - -def rlp_hash(data: RLP) -> Hash32: - """ - Obtain the keccak-256 hash of the rlp encoding of the passed in data. - - Parameters - ---------- - data : - The data for which we need the rlp hash. - - Returns - ------- - hash : `Hash32` - The rlp hash of the passed in data. - """ - return keccak256(encode(data)) \ No newline at end of file diff --git a/ethereum/utils_ensure.py b/ethereum/utils_ensure.py deleted file mode 100644 index 934018c..0000000 --- a/ethereum/utils_ensure.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -Ensure (Assertion) Utilities -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. contents:: Table of Contents - :backlinks: none - :local: - -Introduction ------------- - -Functions that simplify checking assertions and raising exceptions. -""" - -from typing import Type, Union - - -def ensure( - value: bool, exception: Union[Type[BaseException], BaseException] -) -> None: - """ - Does nothing if `value` is truthy, otherwise raises the exception returned - by `exception_class`. - - Parameters - ---------- - - value : - Value that should be true. - - exception : - Constructor for the exception to raise. - """ - if value: - return - raise exception \ No newline at end of file diff --git a/fiber/types.py b/fiber/types.py index dd858c3..335440b 100644 --- a/fiber/types.py +++ b/fiber/types.py @@ -1,19 +1,12 @@ -from eth_utils import encode_hex, big_endian_to_int, to_bytes +from eth_utils import encode_hex, to_bytes from typing import Any, List, Optional, Tuple from eth_typing import HexStr -from ethereum.crypto_hash import keccak256 import fiber.rlp as rlp from fiber.proto import eth_pb2 from fiber.proto import api_pb2 -from fiber.ethereum.base_types import ( - U64, - U256, - Bytes0, - Bytes20, - Bytes32, - Uint -) +from fiber.ethereum.crypto_hash import keccak256 +from fiber.ethereum.base_types import (Bytes20, Bytes32) def hex_to_bytes(data: str) -> bytes: