Skip to content

Commit

Permalink
Index EM transactions for dashboard wallet users C-3529 (#6969)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikki Kang <kangaroo233@gmail.com>
  • Loading branch information
nicoback2 and nicoback authored Dec 21, 2023
1 parent 276540b commit 6ea1880
Show file tree
Hide file tree
Showing 9 changed files with 908 additions and 30 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from unittest.mock import call, create_autospec

from payment_router_mock_transactions import (
mock_valid_track_purchase_single_recipient_tx,
mock_valid_track_purchase_single_recipient_pay_extra_tx,
mock_valid_track_purchase_multi_recipient_tx,
mock_valid_track_purchase_multi_recipient_pay_extra_tx,
mock_invalid_track_purchase_insufficient_split_tx,
mock_valid_transfer_without_purchase_single_recipient_tx,
mock_valid_transfer_without_purchase_multi_recipient_tx,
mock_failed_track_purchase_single_recipient_tx,
mock_non_route_transfer_purchase_single_recipient_tx,
mock_invalid_track_purchase_bad_PDA_account_single_recipient_tx,
mock_invalid_track_purchase_insufficient_split_tx,
mock_non_route_transfer_purchase_single_recipient_tx,
mock_valid_track_purchase_multi_recipient_pay_extra_tx,
mock_valid_track_purchase_multi_recipient_tx,
mock_valid_track_purchase_single_recipient_pay_extra_tx,
mock_valid_track_purchase_single_recipient_tx,
mock_valid_transfer_without_purchase_multi_recipient_tx,
mock_valid_transfer_without_purchase_single_recipient_tx,
)

from integration_tests.utils import populate_mock_db
Expand All @@ -20,10 +20,9 @@
from src.models.users.usdc_purchase import PurchaseType, USDCPurchase
from src.models.users.usdc_transactions_history import (
USDCTransactionMethod,
USDCTransactionType,
USDCTransactionsHistory,
USDCTransactionType,
)

from src.tasks.index_payment_router import process_payment_router_tx_details
from src.utils.db_session import get_db

Expand Down
16 changes: 16 additions & 0 deletions packages/discovery-provider/integration_tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime

from src.models.dashboard_wallet_user.dashboard_wallet_user import DashboardWalletUser
from src.models.grants.developer_app import DeveloperApp
from src.models.grants.grant import Grant
from src.models.indexing.block import Block
Expand Down Expand Up @@ -120,6 +121,7 @@ def populate_mock_db(db, entities, block_offset=None):
users = entities.get("users", [])
developer_apps = entities.get("developer_apps", [])
grants = entities.get("grants", [])
dashboard_wallet_users = entities.get("dashboard_wallet_users", [])
follows = entities.get("follows", [])
subscriptions = entities.get("subscriptions", [])
reposts = entities.get("reposts", [])
Expand Down Expand Up @@ -162,6 +164,7 @@ def populate_mock_db(db, entities, block_offset=None):
len(users),
len(developer_apps),
len(grants),
len(dashboard_wallet_users),
len(follows),
len(saves),
len(reposts),
Expand Down Expand Up @@ -339,6 +342,19 @@ def populate_mock_db(db, entities, block_offset=None):
)
session.add(grant)

for i, dashboard_wallet_user_meta in enumerate(dashboard_wallet_users):
dashboard_wallet_user = DashboardWalletUser(
user_id=dashboard_wallet_user_meta.get("user_id", i),
wallet=dashboard_wallet_user_meta.get("wallet", str(i)),
is_delete=dashboard_wallet_user_meta.get("is_delete", False),
blockhash=hex(i + block_offset),
blocknumber=(i + block_offset),
txhash=dashboard_wallet_user_meta.get("txhash", str(i + block_offset)),
updated_at=dashboard_wallet_user_meta.get("updated_at", datetime.now()),
created_at=dashboard_wallet_user_meta.get("created_at", datetime.now()),
)
session.add(dashboard_wallet_user)

for i, follow_meta in enumerate(follows):
follow = Follow(
blockhash=hex(i + block_offset),
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Integer,
String,
Text,
text,
)

from src.models.base import Base
from src.models.model_utils import RepresentableMixin


class DashboardWalletUser(Base, RepresentableMixin):
__tablename__ = "dashboard_wallet_users"

wallet = Column(String, primary_key=True, nullable=False)
user_id = Column(Integer, nullable=False)
is_delete = Column(Boolean, nullable=False, server_default=text("false"))
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
txhash = Column(
String,
primary_key=True,
nullable=False,
server_default=text("''::character varying"),
)
blockhash = Column(Text, ForeignKey("blocks.blockhash"), nullable=False)
blocknumber = Column(Integer, ForeignKey("blocks.number"), nullable=False)

def get_attributes_dict(self):
return {col.name: getattr(self, col.name) for col in self.__table__.columns}
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import json
import time
from typing import Optional, TypedDict, Union, cast

from src.exceptions import IndexingValidationError
from src.models.dashboard_wallet_user.dashboard_wallet_user import DashboardWalletUser
from src.tasks.entity_manager.utils import (
Action,
EntityType,
ManageEntityParameters,
copy_record,
get_address_from_signature,
)
from src.utils.helpers import decode_string_id
from src.utils.indexing_errors import EntityMissingRequiredFieldError
from src.utils.model_nullable_validator import all_required_fields_present
from src.utils.structured_logger import StructuredLogger

logger = StructuredLogger(__name__)


class Signature(TypedDict):
message: str
signature: str


class CreateDashboardWalletUserMetadata(TypedDict):
wallet_signature: Union[Signature, None]
wallet: Union[str, None]


class DeleteDashboardWalletUserMetadata(TypedDict):
wallet: Union[str, None]


def is_within_5_minutes(timestamp_str):
current_timestamp = int(time.time())
input_timestamp = int(timestamp_str)
time_difference = abs(current_timestamp - input_timestamp)
return time_difference < 5 * 60


def matches_user_id(hash_or_int_id, int_id):
return hash_or_int_id == str(int_id) or decode_string_id(hash_or_int_id) == int_id


def get_create_dashboard_wallet_user_metadata_from_raw(
raw_metadata: Optional[str],
) -> Optional[CreateDashboardWalletUserMetadata]:
metadata: CreateDashboardWalletUserMetadata = {
"wallet": None,
"wallet_signature": None,
}
if raw_metadata:
try:
json_metadata = json.loads(raw_metadata)

metadata["wallet_signature"] = json_metadata.get("wallet_signature", None)
raw_wallet = json_metadata.get("wallet", None)
if raw_wallet:
metadata["wallet"] = raw_wallet.lower()

return metadata
except Exception as e:
logger.error(
f"entity_manager | developer_app.py | Unable to parse dashboard wallet user metadata while indexing: {e}"
)
return None
return metadata


def get_delete_dashboard_wallet_user_metadata_from_raw(
raw_metadata: Optional[str],
) -> Optional[DeleteDashboardWalletUserMetadata]:
metadata: DeleteDashboardWalletUserMetadata = {"wallet": None}
if raw_metadata:
try:
json_metadata = json.loads(raw_metadata)

raw_wallet = json_metadata.get("wallet", None)
if raw_wallet:
metadata["wallet"] = raw_wallet.lower()

return metadata
except Exception as e:
logger.error(
f"entity_manager | developer_app.py | Unable to parse dashboard wallet user metadata while indexing: {e}"
)
return None
return metadata


def validate_dashboard_wallet_user_tx(params: ManageEntityParameters, metadata):
user_id = params.user_id
dashboard_wallet = metadata["wallet"]

if params.entity_type != EntityType.DASHBOARD_WALLET_USER:
raise IndexingValidationError(
f"Invalid Dashboard Wallet User Transaction, wrong entity type {params.entity_type}"
)
if not dashboard_wallet:
raise IndexingValidationError(
f"Invalid {params.action} Dashboard Wallet User Transaction, dashboard wallet address is required and was not provided"
)
if not user_id:
raise IndexingValidationError(
f"Invalid {params.action} Dashboard Wallet User Transaction, user id is required and was not provided"
)
if user_id not in params.existing_records["User"]:
raise IndexingValidationError(
f"Invalid {params.action} Dashboard Wallet User Transaction, user id {user_id} does not exist"
)

user_wallet = params.existing_records["User"][user_id].wallet
user_matches_signer = user_wallet and user_wallet.lower() == params.signer.lower()
if params.action == Action.DELETE:
if dashboard_wallet not in params.existing_records["DashboardWalletUser"]:
raise IndexingValidationError(
f"Invalid Delete Dashboard Wallet User Transaction, dashboard wallet user with wallet {dashboard_wallet} does not exist"
)
# Either the user or the dashboard wallet can sign the Delete tx
if not user_matches_signer and not params.signer.lower() == dashboard_wallet:
raise IndexingValidationError(
"Invalid Delete Dashboard Wallet User Transaction, signature does not match user or dashboard wallet"
)
# If the user is the one who signed the tx, make sure it matches the user id assigned to the wallet
if (
user_matches_signer
and not user_id
== params.existing_records["DashboardWalletUser"][dashboard_wallet].user_id
):
raise IndexingValidationError(
"Invalid Delete Dashboard Wallet User Transaction, user is not assigned to this wallet"
)
elif params.action == Action.CREATE:
if not user_matches_signer:
raise IndexingValidationError(
"Invalid Create Dashboard Wallet User Transaction, signature does not match user"
)
if not metadata["wallet_signature"]:
raise IndexingValidationError(
"Invalid Create Dashboard Wallet User Transaction, wallet signature is required and was not provided"
)
if (
dashboard_wallet in params.existing_records["DashboardWalletUser"]
and params.existing_records["DashboardWalletUser"][
dashboard_wallet
].is_delete
== False
):
raise IndexingValidationError(
f"Invalid Create Dashboard Wallet User Transaction, dashboard wallet {dashboard_wallet} already has an assigned user"
)
# Expect wallet_signature message to be "Connecting Audius user id {user hash id} at {timestamp}"
if (
not isinstance(metadata["wallet_signature"], dict)
or not metadata["wallet_signature"]
.get("message", "")
.startswith("Connecting Audius user id")
or not matches_user_id(
(metadata["wallet_signature"].get("message", "").split()[-3]), user_id
)
or not is_within_5_minutes(
(metadata["wallet_signature"].get("message", "").split())[-1]
)
):
raise IndexingValidationError(
"Invalid Create Dashboard Wallet Transaction, wallet signature provided does not have correct message"
)
try:
signature_address = get_address_from_signature(metadata["wallet_signature"])
except:
raise IndexingValidationError(
"Invalid Create Dashboard Wallet User Transaction, signature provided is invalid"
)
if not signature_address or not signature_address.lower() == dashboard_wallet:
raise IndexingValidationError(
"Invalid Create Dashboard Wallet User Transaction, signature provided is invalid"
)
else:
raise IndexingValidationError(
f"Invalid Dashboard Wallet User Transaction, action {params.action} is not valid"
)


def validate_dashboard_wallet_user_record(dashboard_wallet_user):
if not all_required_fields_present(DashboardWalletUser, dashboard_wallet_user):
raise EntityMissingRequiredFieldError(
"dashboard_wallet_user",
dashboard_wallet_user,
f"Error parsing {dashboard_wallet_user} with entity missing required field(s)",
)

return dashboard_wallet_user


def create_dashboard_wallet_user(params: ManageEntityParameters):
metadata = get_create_dashboard_wallet_user_metadata_from_raw(params.metadata)
if not metadata:
raise IndexingValidationError(
"Invalid Dashboard Wallet User Transaction, unable to parse metadata"
)
validate_dashboard_wallet_user_tx(params, metadata)
user_id = params.user_id
dashboard_wallet = metadata["wallet"]

dashboard_wallet_user_record = DashboardWalletUser(
user_id=user_id,
wallet=cast(
str, dashboard_wallet
), # cast to assert non null (since we validated above)
txhash=params.txhash,
blockhash=params.event_blockhash,
blocknumber=params.block_number,
updated_at=params.block_datetime,
created_at=params.block_datetime,
)

validate_dashboard_wallet_user_record(dashboard_wallet_user_record)
params.add_record(dashboard_wallet, dashboard_wallet_user_record)
return dashboard_wallet_user_record


def delete_dashboard_wallet_user(params: ManageEntityParameters):
metadata = get_delete_dashboard_wallet_user_metadata_from_raw(params.metadata)
if not metadata:
raise IndexingValidationError(
"Invalid Dashboard Wallet User Transaction, unable to parse metadata"
)
validate_dashboard_wallet_user_tx(params, metadata)
dashboard_wallet = cast(str, metadata["wallet"])
existing_dashboard_wallet_user = params.existing_records["DashboardWalletUser"][
dashboard_wallet
]
if dashboard_wallet in params.new_records["DashboardWalletUser"]:
existing_dashboard_wallet_user = params.new_records["DashboardWalletUser"][
dashboard_wallet
][-1]

deleted_dashboard_wallet_user = copy_record(
existing_dashboard_wallet_user,
params.block_number,
params.event_blockhash,
params.txhash,
params.block_datetime,
)

deleted_dashboard_wallet_user.is_delete = True

validate_dashboard_wallet_user_record(deleted_dashboard_wallet_user)
params.add_record(dashboard_wallet, deleted_dashboard_wallet_user)
return deleted_dashboard_wallet_user
Loading

0 comments on commit 6ea1880

Please sign in to comment.