diff --git a/.gitmodules b/.gitmodules index 64302f7..00d4647 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "ton-index-cpp"] path = ton-index-cpp url = https://github.com/toncenter/ton-index-worker +[submodule "ton-index-go"] + path = ton-index-go + url = https://github.com/toncenter/ton-index-go diff --git a/README.md b/README.md index 50cf084..07aa1a4 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ > [!NOTE] > This repository's master branch hosts the TON Indexer designed for direct reading from the TON Node database. If you are looking for an indexer that operates through `tonlib + liteserver`, please refer to the branch [legacy](https://github.com/toncenter/ton-indexer/tree/legacy). +> [!NOTE] +> Be careful upgrading the indexer. Versions with different major and minor have different schemas and they are incompatible with each other. + TON Indexer stores blocks, transactions, messages, NFTs, Jettons and DNS domains in PostgreSQL database and provides convenient API. @@ -11,7 +14,7 @@ TON node stores data in a key-value database RocksDB. While RocksDB excels in s TON Indexer stack consists of: 1. `postgres`: PostgreSQL server to store indexed data and perform queries. 2. `index-api`: FastAPI server with convenient endpoints to access the database. -3. `alembic`: alembic service to run database migrations. +3. `event-classifier`: trace classification service. 4. `index-worker`: TON Index worker to read and parse data from TON node database. This service must be run on the machine with a working TON Node. ## How to run @@ -30,8 +33,8 @@ Do the following steps to setup TON Indexer: * ./configure.sh will create .env file only with indexer and PostgreSQL configuration data. Use --worker flag to add TON Index worker configuration data too. * Adjust parameters in *.env* file (see [list of available parameters](#available-parameters)). * Set PostgreSQL password `echo -n "MY_PASSWORD" > private/postgres_password` -* Build docker images: `docker compose build postgres alembic index-api`. -* Run stack: `docker compose up -d postgres alembic index-api`. +* Build docker images: `docker compose build postgres event-classifier index-api`. +* Run stack: `docker compose up -d postgres event-classifier event-cache index-api`. * To start worker use command `docker compose up -d index-worker` after creating all services. **NOTE:** we recommend to setup indexer stack and index worker on separate servers. To install index worker to **Systemd** check this [instruction](https://github.com/toncenter/ton-index-worker). @@ -53,9 +56,7 @@ Do the following steps to setup TON Indexer: * TON worker parameters: * `TON_WORKER_DBROOT`: path to TON full node database. Use default value if you've installed node with `mytonctrl`. Default: `/var/ton-work/db`. * `TON_WORKER_FROM`: masterchain seqno to start indexing. Set 1 to full index, set last masterchain seqno to collect only new blocks (use [/api/v2/getMasterchainInfo](https://toncenter.com/api/v2/getMasterchainInfo) to get last masterchain block seqno). - * `TON_WORKER_MAX_PARALLEL_TASKS`: max parallel reading actors. Adjust this parameter to decrease RAM usage. Default: `1024`. - * `TON_WORKER_INSERT_BATCH_SIZE`: max masterchain seqnos per INSERT query. Small value will decrease indexing performance. Great value will increase RAM usage. Default: `512`. - * `TON_WORKER_INSERT_PARALLEL_ACTORS`: number of parallel INSERT transactions. Increasing this number will increase PostgreSQL server RAM usage. Default: `3`. + * `TON_WORKER_ADDITIONAL_ARGS`: additional args to pass into index worker. ## Swagger @@ -66,13 +67,12 @@ To test API, built-in swagger can be used. It is available after running `docker ## How to point TON Index worker to existing PostgreSQL instance * Remove PostgreSQL container: `docker compose rm postgres` (add flag `-v` to remove volumes). * Setup PostgreSQL credentials in *.env* file. -* Run alembic migration: `docker compose up alembic`. -* Run index worker: `docker compose up -d index-worker`. +* Run index worker: `docker compose up -d index-worker index-api event-classifier event-cache`. ## How to update code * Pull new commits: `git pull`. * Update submodules: `git submodule update --recursive --init`. -* Build new image: `docker compose build postgres alembic index-api`. +* Build new image: `docker compose build postgres event-classifier event-cache index-api`. * Build new image of worker: `docker compose build index-worker` -* Run new version: `docker compose up -d postgres alembic index-api` +* Run new version: `docker compose up -d postgres event-classifier event-cache index-api` * Run new version of worker: `docker compose up -d index-worker` diff --git a/configure.sh b/configure.sh index b27320f..2ef3f55 100755 --- a/configure.sh +++ b/configure.sh @@ -47,8 +47,6 @@ TON_INDEXER_WORKERS=4 TON_INDEXER_TON_HTTP_API_ENDPOINT=${TON_INDEXER_TON_HTTP_API_ENDPOINT} -TON_WORKER_DBROOT= - EOF if [[ "$WORKER" -eq 1 ]]; then @@ -57,8 +55,10 @@ echo "Configure Worker" cat <> .env TON_WORKER_DBROOT=${TON_WORKER_DBROOT:-/var/ton-work/db/} TON_WORKER_FROM=${TON_WORKER_FROM:-1} -TON_WORKER_MAX_PARALLEL_TASKS=${TON_WORKER_MAX_PARALLEL_TASKS:-1024} -TON_WORKER_INSERT_BATCH_SIZE=${TON_WORKER_INSERT_BATCH_SIZE:-512} -TON_WORKER_INSERT_PARALLEL_ACTORS=${TON_WORKER_INSERT_PARALLEL_ACTORS:-3} +TON_WORKER_ADDITIONAL_ARGS=${TON_WORKER_ADDITIONAL_ARGS} +EOF +else +cat <> .env +TON_WORKER_DBROOT= EOF fi diff --git a/docker-compose.alembic.yaml b/docker-compose.alembic.yaml deleted file mode 100644 index 16b8df3..0000000 --- a/docker-compose.alembic.yaml +++ /dev/null @@ -1,46 +0,0 @@ -version: '3.9' - - -x-indexer-environment: &indexer-environment - POSTGRES_DIALECT: ${POSTGRES_DIALECT:-postgresql+asyncpg} - POSTGRES_HOST: - POSTGRES_PORT: - POSTGRES_USER: - POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password - POSTGRES_DBNAME: - TON_INDEXER_API_ROOT_PATH: - -services: - alembic: - image: ${DOCKER_REGISTRY:-docker.io/toncenter}/ton-indexer-cpp:${IMAGE_TAG:?} - build: - context: indexer - dockerfile: Dockerfile - environment: *indexer-environment - secrets: - - postgres_password - command: alembic upgrade head-1 - deploy: - mode: replicated - replicas: 1 - placement: - constraints: - - "node.labels.${TONCENTER_ENV:?}.indexer-cpp.api==true" - restart_policy: - condition: on-failure - delay: 3s - max_attempts: 3 - window: 60s - networks: - internal: - -networks: - internal: - attachable: true - external: false - driver_opts: - com.docker.network.driver.mtu: 1350 - -secrets: - postgres_password: - file: ${POSTGRES_PASSWORD_FILE:-private/postgres_password} diff --git a/docker-compose.api.yaml b/docker-compose.api.yaml index c87633d..2e008d7 100644 --- a/docker-compose.api.yaml +++ b/docker-compose.api.yaml @@ -1,7 +1,7 @@ version: '3.9' x-indexer-environment: &indexer-environment - POSTGRES_DIALECT: ${POSTGRES_DIALECT:-postgresql+asyncpg} + POSTGRES_DIALECT: ${POSTGRES_DIALECT:-postgresql} POSTGRES_HOST: POSTGRES_PORT: POSTGRES_USER: @@ -13,31 +13,32 @@ x-indexer-environment: &indexer-environment TON_INDEXER_IS_TESTNET: services: - index-api: - image: ${DOCKER_REGISTRY:-localhost:5000}/ton-indexer-cpp:${IMAGE_TAG:?} + index-api-go: + image: ${DOCKER_REGISTRY:-localhost:5000}/ton-index-go:${IMAGE_TAG:?} build: - context: indexer + context: ton-index-go dockerfile: Dockerfile secrets: - postgres_password - command: gunicorn indexer.api.main:app -k uvicorn.workers.UvicornWorker --bind=0.0.0.0:8081 -w ${TON_INDEXER_WORKERS:-1} + command: -prefork -threads ${TON_INDEXER_WORKERS:-1} + init: true ports: - target: 8081 published: ${TON_INDEXER_API_PORT:-8081} protocol: tcp mode: host - environment: *indexer-environment + environment: *indexer-environment deploy: mode: replicated replicas: ${SWARM_REPLICAS:-1} placement: constraints: - - "node.labels.${TONCENTER_ENV:?}.indexer-cpp.api==true" + - "node.labels.${TONCENTER_ENV:?}.indexer-cpp.api-go==true" networks: internal: toncenter-global: aliases: - - ${TONCENTER_ENV:?}-indexer-cpp-api + - ${TONCENTER_ENV:?}-indexer-cpp-api-go networks: internal: @@ -51,4 +52,3 @@ networks: secrets: postgres_password: file: ${POSTGRES_PASSWORD_FILE:-private/postgres_password} - \ No newline at end of file diff --git a/docker-compose.database.yaml b/docker-compose.database.yaml index 4d102ab..6f0a509 100644 --- a/docker-compose.database.yaml +++ b/docker-compose.database.yaml @@ -2,7 +2,7 @@ version: '3.9' services: postgres: - image: postgres:15 + image: postgres:16 environment: POSTGRES_USER: POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password diff --git a/docker-compose.events.yaml b/docker-compose.events.yaml new file mode 100644 index 0000000..7ca4ef3 --- /dev/null +++ b/docker-compose.events.yaml @@ -0,0 +1,57 @@ +version: '3.9' + +x-indexer-environment: &indexer-environment + POSTGRES_DIALECT: + POSTGRES_HOST: + POSTGRES_PORT: + POSTGRES_USER: + POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password + POSTGRES_DBNAME: + TON_INDEXER_API_ROOT_PATH: + TON_INDEXER_API_TITLE: + TON_INDEXER_TON_HTTP_API_ENDPOINT: + TON_INDEXER_REDIS_DSN: redis://event-cache:6379 + +services: + event-cache: + image: redis:latest + networks: + - internal + command: redis-server --maxclients 40000 + deploy: + mode: replicated + replicas: 1 + placement: + constraints: + - "node.labels.${TONCENTER_ENV:?}.indexer-cpp.events==true" + event-classifier: + image: ${DOCKER_REGISTRY:-localhost:5000}/ton-index-event-classfier:${IMAGE_TAG:?} + build: + context: indexer + dockerfile: Dockerfile + secrets: + - postgres_password + command: python3 /app/event_classifier.py --pool-size ${TON_INDEXER_WORKERS:-8} --fetch-size 64000 --batch-size 2000 + environment: *indexer-environment + deploy: + mode: replicated + replicas: 1 + placement: + constraints: + - "node.labels.${TONCENTER_ENV:?}.indexer-cpp.events==true" + networks: + internal: + +networks: + internal: + attachable: true + external: false + driver_opts: + com.docker.network.driver.mtu: 1350 + toncenter-global: + external: true + +secrets: + postgres_password: + file: private/postgres_password + \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 668b998..eabff2a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -15,43 +15,47 @@ x-index-api: &index-api TON_INDEXER_API_PORT: TON_INDEXER_TON_HTTP_API_ENDPOINT: TON_INDEXER_IS_TESTNET: + TON_INDEXER_REDIS_DSN: redis://event-cache:6379 <<: *index-common x-index-worker: &index-worker TON_WORKER_FROM: + TON_WORKER_DBROOT: /tondb + TON_WORKER_BINARY: ton-index-postgres-v2 + TON_WORKER_ADDITIONAL_ARGS: <<: *index-common services: - index-api: + event-cache: + image: redis:latest + networks: + - internal + command: redis-server --maxclients 40000 + event-classifier: build: context: indexer dockerfile: Dockerfile secrets: - postgres_password - command: gunicorn indexer.api.main:app -k uvicorn.workers.UvicornWorker --bind=0.0.0.0:8081 -w ${TON_INDEXER_WORKERS:-1} - ports: - - target: 8081 - published: ${TON_INDEXER_API_PORT:-8081} - environment: *index-api - restart: always + command: python3 /app/event_classifier.py --pool-size ${TON_INDEXER_WORKERS:-8} --fetch-size 10000 --batch-size 500 + environment: *index-api networks: internal: - depends_on: - - alembic - alembic: + index-api: build: - context: indexer + context: ton-index-go dockerfile: Dockerfile - environment: *index-api secrets: - postgres_password - command: alembic upgrade head-1 + command: -prefork -threads ${TON_INDEXER_WORKERS:-1} + ports: + - target: 8081 + published: ${TON_INDEXER_API_PORT:-8081} + environment: *index-api + restart: always networks: internal: - depends_on: - postgres: - condition: service_healthy index-worker: build: @@ -66,9 +70,6 @@ services: internal: command: --from ${TON_WORKER_FROM:-1} restart: unless-stopped - depends_on: - alembic: - condition: service_completed_successfully postgres: image: postgres:16 diff --git a/indexer/Dockerfile b/indexer/Dockerfile index 478c827..45b1bef 100644 --- a/indexer/Dockerfile +++ b/indexer/Dockerfile @@ -1,8 +1,4 @@ -FROM ubuntu:20.04 - -RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get update && apt-get -y install tzdata && rm -rf /var/lib/{apt,dpkg,cache,log}/ -RUN apt-get update && apt-get install -y git cmake wget python3 python3-pip && rm -rf /var/lib/{apt,dpkg,cache,log}/ -RUN python3 -m pip install -U pip +FROM python:3.12-bookworm # python requirements ADD requirements.txt /tmp/requirements.txt diff --git a/indexer/classifier.Dockerfile b/indexer/classifier.Dockerfile new file mode 100644 index 0000000..018dc32 --- /dev/null +++ b/indexer/classifier.Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.10 + +# python requirements +ADD requirements.txt /tmp/requirements.txt +RUN python3 -m pip install --no-cache-dir -r /tmp/requirements.txt + +# app +COPY . /app +WORKDIR /app + +# entrypoint +ENV C_FORCE_ROOT 1 +ENTRYPOINT [ "/app/entrypoint.sh" ] diff --git a/indexer/entrypoint.sh b/indexer/entrypoint.sh index 748e04d..a9d198e 100755 --- a/indexer/entrypoint.sh +++ b/indexer/entrypoint.sh @@ -13,13 +13,14 @@ else echo "Postgres password file not specified!" exit 1 fi - -if [ "$POSTGRES_PASSWORD" -eq 0 ]; then +export TQDM_NCOLS=0 +export TQDM_POSITION=-1 +if [ -z "$POSTGRES_PASSWORD" ]; then echo "Using postgres connection without password" - export TON_INDEXER_PG_DSN="${POSTGRES_DIALECT}://${POSTGRES_USER}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DBNAME}" + export TON_INDEXER_PG_DSN="${POSTGRES_DIALECT:-postgresql+asyncpg}://${POSTGRES_USER}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DBNAME}" else echo "Using postgres connection with password" - export TON_INDEXER_PG_DSN="${POSTGRES_DIALECT}://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DBNAME}" + export TON_INDEXER_PG_DSN="${POSTGRES_DIALECT:-postgresql+asyncpg}://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DBNAME}" fi printenv diff --git a/indexer/event_classifier.py b/indexer/event_classifier.py new file mode 100644 index 0000000..4a6ac96 --- /dev/null +++ b/indexer/event_classifier.py @@ -0,0 +1,220 @@ +import argparse +import asyncio +import logging +import multiprocessing as mp +import sys +import time +import threading + +from queue import Queue + +from sqlalchemy import update, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import sessionmaker, contains_eager + +from indexer.core import redis +from indexer.core.database import engine, Trace, Transaction, Message, Action, TraceEdge, SyncSessionMaker +from indexer.core.settings import Settings +from indexer.events import context +from indexer.events.blocks.utils.block_tree_serializer import block_to_action +from indexer.events.blocks.utils.event_deserializer import deserialize_event +from indexer.events.event_processing import process_event_async +from indexer.events.interface_repository import EmulatedTransactionsInterfaceRepository, gather_interfaces, \ + RedisInterfaceRepository + +async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) +settings = Settings() + + +async def start_processing_events_from_db(args: argparse.Namespace): + global lt, count + logger.info(f"Creating pool of {args.pool_size} workers") + + queue = mp.Queue(2 * args.fetch_size) + thread = mp.Process(target=fetch_events_for_processing, args=(queue, 1000)) + thread.start() + big_traces = 0 + count = 0 + lt = 0 + with mp.Pool(args.pool_size, initializer=init_pool) as pool: + while True: + async with async_session() as session: + start_wait = time.time() + batch = [] + while time.time() - start_wait < 1.0 or len(batch) < args.fetch_size: + try: + item = queue.get(False) + batch.append(item) + except: + await asyncio.sleep(0.5) + if (time.time() - lt) > 5 and lt > 0: + logger.info(f"Processed {count} traces in {time.time() - lt:02f} seconds. Traces/sec: {count / (time.time() - lt):02f}, queue size: {queue.qsize()}, big traces: {big_traces}") + else: + logger.info(f'Processing first batch of {len(batch)} traces, queue size: {queue.qsize()}') + ids = [] + has_traces_to_process = False + total_nodes = 0 + for (trace_id, nodes) in batch: + if nodes > 4000 or nodes == 0: + if nodes > 0: + big_traces += 1 + await session.execute(update(Trace) + .where(Trace.trace_id == trace_id) + .values(classification_state='broken')) + has_traces_to_process = True + else: + total_nodes += nodes + has_traces_to_process = True + ids.append(trace_id) + await session.commit() + if count == 0: + lt = time.time() + count += len(ids) + if has_traces_to_process: + pool.map(process_event_batch, split_into_batches(ids, args.batch_size)) + else: + await asyncio.sleep(0.5) + thread.join() +# end def + +def init_pool(): + asyncio.set_event_loop(asyncio.new_event_loop()) + + +def split_into_batches(data, batch_size): + for i in range(0, len(data), batch_size): + yield data[i:i + batch_size] + + +async def start_emulated_traces_processing(): + pubsub = redis.client.pubsub() + await pubsub.subscribe(settings.emulated_traces_reddit_channel) + while True: + message = await pubsub.get_message() + if message is not None and message['type'] == 'message': + trace_id = message['data'].decode('utf-8') + try: + start = time.time() + res = await process_emulated_trace(trace_id) + except Exception as e: + logger.error(f"Failed to process emulated trace {trace_id}: {e}") + else: + await asyncio.sleep(1) + + +async def process_emulated_trace(trace_id): + trace_map = await redis.client.hgetall(trace_id) + trace_map = dict((str(key, encoding='utf-8'), value) for key, value in trace_map.items()) + trace = deserialize_event(trace_id, trace_map) + context.interface_repository.set(EmulatedTransactionsInterfaceRepository(trace_map)) + return await process_event_async(trace) + + +def fetch_events_for_processing(queue: mp.Queue, fetch_size: int): + logger.info(f'fetching unclassified traces...') + while True: + with SyncSessionMaker() as session: + query = session.query(Trace.trace_id, Trace.nodes_) \ + .filter(Trace.state == 'complete') \ + .filter(Trace.classification_state == 'unclassified') \ + .order_by(Trace.start_lt.desc()) + query = query.yield_per(fetch_size) + for item in query: + queue.put(item) + time.sleep(1) +# end def + + +def process_event_batch(ids: list[str]): + asyncio.get_event_loop().run_until_complete(process_trace_batch_async(ids)) + return None + + +async def process_trace_batch_async(ids: list[str]): + async with async_session() as session: + query = select(Trace) \ + .join(Trace.transactions) \ + .join(Transaction.messages, isouter=True) \ + .join(Message.message_content, isouter=True) \ + .options(contains_eager(Trace.transactions, Transaction.messages, Message.message_content)) \ + .filter(Trace.trace_id.in_(ids)) + result = await session.execute(query) + traces = result.scalars().unique().all() + + # Gather interfaces for each account + accounts = set() + for trace in traces: + for tx in trace.transactions: + accounts.add(tx.account) + interfaces = await gather_interfaces(accounts, session) + repository = RedisInterfaceRepository(redis.sync_client) + await repository.put_interfaces(interfaces) + context.interface_repository.set(repository) + # Process traces and save actions + results = await asyncio.gather(*(process_trace(trace) for trace in traces)) + ok_traces = [] + failed_traces = [] + broken_traces = [] + for trace_id, state, actions in results: + if state == 'ok' or state == 'broken': + session.add_all(actions) + if state == 'ok': + ok_traces.append(trace_id) + else: + broken_traces.append(trace_id) + else: + failed_traces.append(trace_id) + stmt = update(Trace).where(Trace.trace_id.in_(ok_traces)).values(classification_state='ok') + await session.execute(stmt) + if len(broken_traces) > 0: + stmt = update(Trace).where(Trace.trace_id.in_(broken_traces)).values(classification_state='broken') + await session.execute(stmt) + stmt = update(Trace).where(Trace.trace_id.in_(failed_traces)).values(classification_state='failed') + await session.execute(stmt) + await session.commit() + + +async def process_trace(trace: Trace) -> tuple[str, str, list[Action]]: + if len(trace.transactions) == 1 and trace.transactions[0].descr == 'tick_tock': + return trace.trace_id, 'ok', [] + try: + result = await process_event_async(trace) + actions = [] + state = 'ok' + for block in result.bfs_iter(): + if block.btype != 'root': + if block.btype == 'call_contract' and block.event_nodes[0].message.destination is None: + continue + if block.broken: + state = 'broken' + action = block_to_action(block, trace.trace_id) + actions.append(action) + return trace.trace_id, state, actions + except Exception as e: + logger.error("Marking trace as failed " + trace.trace_id + " - " + str(e)) + return trace.trace_id, 'failed', [] + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--fetch-size', + help='Number of traces to fetch from db in one batch', + type=int, + default=10000) + parser.add_argument('--batch-size', + help='Number of traces to process in one batch', + type=int, + default=1000) + parser.add_argument('--pool-size', + help='Number of workers to process traces', + type=int, + default=4) + args = parser.parse_args() + if settings.emulated_traces: + logger.info("Starting processing emulated traces") + asyncio.run(start_emulated_traces_processing()) + else: + logger.info("Starting processing events from db") + asyncio.run(start_processing_events_from_db(args)) diff --git a/indexer/indexer/core/database.py b/indexer/indexer/core/database.py index 2e77a0b..ae6ed22 100644 --- a/indexer/indexer/core/database.py +++ b/indexer/indexer/core/database.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging from time import sleep @@ -6,7 +8,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker -from sqlalchemy_utils import create_database, database_exists +from sqlalchemy_utils import create_database, database_exists, CompositeType from sqlalchemy import Column, String, Integer, BigInteger, Boolean, Index, Enum, Numeric from sqlalchemy.schema import ForeignKeyConstraint @@ -37,30 +39,27 @@ # async engine def get_engine(settings: Settings): logger.critical(settings.pg_dsn) - engine = create_async_engine(settings.pg_dsn, - pool_size=128, - max_overflow=0, - pool_timeout=5, - pool_pre_ping=True, # using pessimistic approach about closed connections problem: https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic - echo=False, - connect_args={'server_settings': {'statement_timeout': '3000'}} - ) + engine = create_async_engine(settings.pg_dsn, + pool_size=128, + max_overflow=24, + pool_timeout=128, + echo=False) return engine engine = get_engine(settings) SessionMaker = sessionmaker(bind=engine, class_=AsyncSession) # # async engine -# def get_sync_engine(settings: Settings): -# pg_dsn = settings.pg_dsn.replace('+asyncpg', '') -# logger.critical(pg_dsn) -# engine = create_engine(pg_dsn, -# pool_size=128, -# max_overflow=0, -# pool_timeout=5, -# echo=False) -# return engine -# sync_engine = get_sync_engine(settings) -# SyncSessionMaker = sessionmaker(bind=sync_engine) +def get_sync_engine(settings: Settings): + pg_dsn = settings.pg_dsn.replace('+asyncpg', '') + logger.critical(pg_dsn) + engine = create_engine(pg_dsn, + pool_size=128, + max_overflow=0, + pool_timeout=5, + echo=False) + return engine +sync_engine = get_sync_engine(settings) +SyncSessionMaker = sessionmaker(bind=sync_engine) # database Base = declarative_base() @@ -119,8 +118,8 @@ class Block(Base): mc_block_shard: str = Column(BigInteger, nullable=True) mc_block_seqno: int = Column(Integer, nullable=True) - masterchain_block = relationship("Block", - remote_side=[workchain, shard, seqno], + masterchain_block = relationship("Block", + remote_side=[workchain, shard, seqno], backref='shard_blocks') global_id: int = Column(Integer) @@ -151,37 +150,129 @@ class Block(Base): transactions = relationship("Transaction", back_populates="block") -class Event(Base): - __tablename__ = 'events' - id: int = Column(BigInteger, primary_key=True) - meta: Dict[str, Any] = Column(JSONB) - - # transactions: List["EventTransaction"] = relationship("EventTransaction", back_populates="event") - transactions: List["Transaction"] = relationship("Transaction", - foreign_keys=[id], - primaryjoin='Event.id == Transaction.event_id', +class Trace(Base): + __tablename__ = 'traces' + trace_id = Column(String(44), primary_key=True) + external_hash: str = Column(String) + mc_seqno_start: int = Column(Integer) + mc_seqno_end: int = Column(Integer) + start_lt: int = Column(BigInteger) + start_utime: int = Column(Integer) + end_lt: int = Column(BigInteger) + end_utime: int = Column(Integer) + state = Column(Enum('complete', 'pending', 'broken', name='trace_state')) + pending_edges_: int = Column(BigInteger) + edges_: int = Column(BigInteger) + nodes_: int = Column(BigInteger) + classification_state = Column(Enum('unclassified', 'failed', 'ok', 'broken', name='trace_classification_state')) + + edges: List[TraceEdge] = relationship("TraceEdge", back_populates="trace", uselist=True, viewonly=True) + transactions: List["Transaction"] = relationship("Transaction", + foreign_keys=[trace_id], + primaryjoin='Trace.trace_id == Transaction.trace_id', uselist=True, viewonly=True) - edges: List["EventEdge"] = relationship("EventEdge", back_populates="event") -# class EventTransaction(Base): -# __tablename__ = 'event_transactions' -# event_id: int = Column(BigInteger, ForeignKey("events.id"), primary_key=True) -# tx_hash: str = Column(String, ForeignKey("transactions.hash"), primary_key=True) +class TraceEdge(Base): + __tablename__ = 'trace_edges' + trace_id: str = Column(String(44), ForeignKey("traces.trace_id"), primary_key=True) + msg_hash: str = Column(String(44), primary_key=True) + left_tx: str = Column(String) + right_tx: str = Column(String) + incomplete: bool = Column(Boolean) + broken: bool = Column(Boolean) -# event: Event = relationship("Event", back_populates="transactions") -# transactions: List["Transaction"] = relationship("Transaction", back_populates="event") + trace: "Trace" = relationship("Trace", back_populates="edges", viewonly=True) -class EventEdge(Base): - __tablename__ = 'event_graph' - event_id: int = Column(BigInteger, ForeignKey("events.id"), primary_key=True) - left_tx_hash: str = Column(String, primary_key=True) - right_tx_hash: str = Column(String, primary_key=True) - - event: "Event" = relationship("Event", back_populates="edges") +class Action(Base): + __tablename__ = 'actions' + action_id: str = Column(String, primary_key=True) + type: str = Column(String()) + trace_id: str = Column(String(44), ForeignKey('traces.trace_id'), nullable=False, primary_key=True) + tx_hashes: list[str] = Column(ARRAY(String())) + value: int = Column(Numeric) + amount: int = Column(Numeric) + start_lt: int | None = Column(BigInteger) + end_lt: int | None = Column(BigInteger) + start_utime: int | None = Column(BigInteger) + end_utime: int | None = Column(BigInteger) + source: str | None = Column(String(70)) + source_secondary: str | None = Column(String(70)) + destination: str | None = Column(String(70)) + destination_secondary: str | None = Column(String(70)) + asset: str | None = Column(String(70)) + asset_secondary: str | None = Column(String(70)) + asset2: str | None = Column(String(70)) + asset2_secondary: str | None = Column(String(70)) + opcode: int | None = Column(BigInteger) + success: bool = Column(Boolean) + ton_transfer_data = Column(CompositeType("ton_transfer_details", [ + Column("content", String), + Column("encrypted", Boolean) + ])) + jetton_transfer_data = Column(CompositeType("jetton_transfer_details", [ + Column("response_destination", String), + Column("forward_amount", Numeric), + Column("query_id", Numeric), + Column("custom_payload", String), + Column("forward_payload", String), + Column("comment", String), + Column("is_encrypted_comment", Boolean) + ])) + nft_transfer_data = Column(CompositeType("nft_transfer_details", [ + Column("is_purchase", Boolean), + Column("price", Numeric), + Column("query_id", Numeric), + Column("custom_payload", String), + Column("forward_payload", String), + Column("forward_amount", Numeric), + Column("response_destination", String), + Column("nft_item_index", Numeric), + ])) + jetton_swap_data = Column(CompositeType("jetton_swap_details", [ + Column("dex", String), + Column("sender", String), + Column("dex_incoming_transfer", CompositeType("dex_transfer_details",[ + Column("amount", Numeric), + Column("asset", String), + Column("source", String), + Column("destination", String), + Column("source_jetton_wallet", String), + Column("destination_jetton_wallet", String) + ])), + Column("dex_outgoing_transfer", CompositeType("dex_transfer_details", [ + Column("amount", Numeric), + Column("asset", String), + Column("source", String), + Column("destination", String), + Column("source_jetton_wallet", String), + Column("destination_jetton_wallet", String) + ])), + Column("peer_swaps", ARRAY(CompositeType("peer_swap_details", [ + Column("asset_in", String), + Column("amount_in", Numeric), + Column("asset_out", String), + Column("amount_out", Numeric), + ])))])) + change_dns_record_data = Column(CompositeType("change_dns_record_details", [ + Column("key", String), + Column("value_schema", String), + Column("value", String), + Column("flags", Integer) + ])) + nft_mint_data = Column(CompositeType("nft_mint_details", [ + Column("nft_item_index", Numeric)])) + + def __repr__(self): + full_repr = "" + for key, value in self.__dict__.items(): + if key.startswith("_"): + continue + full_repr += f"{key}={value}, " + return full_repr class Transaction(Base): __tablename__ = 'transactions' @@ -195,8 +286,8 @@ class Transaction(Base): block_workchain = Column(Integer) block_shard = Column(BigInteger) block_seqno = Column(Integer) - - mc_block_seqno: int = Column(Integer, nullable=True) + mc_block_seqno = Column(Integer) + trace_id = Column(String(44)) block = relationship("Block", back_populates="transactions") @@ -215,29 +306,75 @@ class Transaction(Base): account_state_hash_before = Column(String) account_state_hash_after = Column(String) - event_id: Optional[int] = Column(BigInteger) - account_state_before = relationship("AccountState", + descr = Column(Enum('ord', 'storage', 'tick_tock', 'split_prepare', + 'split_install', 'merge_prepare', 'merge_install', name='descr_type')) + aborted: bool = Column(Boolean) + destroyed: bool = Column(Boolean) + credit_first: bool = Column(Boolean) + is_tock: bool = Column(Boolean) + installed: bool = Column(Boolean) + storage_fees_collected: int = Column(BigInteger) + storage_fees_due: int = Column(BigInteger) + storage_status_change = Column(Enum('unchanged', 'frozen', 'deleted', name='status_change_type')) + credit_due_fees_collected: int = Column(BigInteger) + credit: int = Column(BigInteger) + compute_skipped: bool = Column(Boolean) + skipped_reason = Column(Enum('no_state', 'bad_state', 'no_gas', 'suspended', name='skipped_reason_type')) + compute_success: bool = Column(Boolean) + compute_msg_state_used: bool = Column(Boolean) + compute_account_activated: bool = Column(Boolean) + compute_gas_fees: int = Column(BigInteger) + compute_gas_used: int = Column(BigInteger) + compute_gas_limit: int = Column(BigInteger) + compute_gas_credit: int = Column(BigInteger) + compute_mode: int = Column(Integer) + compute_exit_code: int = Column(Integer) + compute_exit_arg: int = Column(Integer) + compute_vm_steps: int = Column(BigInteger) + compute_vm_init_state_hash: str = Column(String) + compute_vm_final_state_hash: str = Column(String) + action_success: bool = Column(Boolean) + action_valid: bool = Column(Boolean) + action_no_funds: bool = Column(Boolean) + action_status_change = Column(Enum('unchanged', 'frozen', 'deleted', name='status_change_type')) + action_total_fwd_fees: int = Column(BigInteger) + action_total_action_fees: int = Column(BigInteger) + action_result_code: int = Column(Integer) + action_result_arg: int = Column(Integer) + action_tot_actions: int = Column(Integer) + action_spec_actions: int = Column(Integer) + action_skipped_actions: int = Column(Integer) + action_msgs_created: int = Column(Integer) + action_action_list_hash: str = Column(String) + action_tot_msg_size_cells: int = Column(BigInteger) + action_tot_msg_size_bits: int = Column(BigInteger) + bounce = Column(Enum('negfunds', 'nofunds', 'ok', name='bounce_type')) + bounce_msg_size_cells: int = Column(BigInteger) + bounce_msg_size_bits: int = Column(BigInteger) + bounce_req_fwd_fees: int = Column(BigInteger) + bounce_msg_fees: int = Column(BigInteger) + bounce_fwd_fees: int = Column(BigInteger) + split_info_cur_shard_pfx_len: int = Column(Integer) + split_info_acc_split_depth: int = Column(Integer) + split_info_this_addr: str = Column(String) + split_info_sibling_addr: str = Column(String) + + account_state_before = relationship("AccountState", foreign_keys=[account_state_hash_before], - primaryjoin="AccountState.hash == Transaction.account_state_hash_before", + primaryjoin="AccountState.hash == Transaction.account_state_hash_before", viewonly=True) - account_state_after = relationship("AccountState", + account_state_after = relationship("AccountState", foreign_keys=[account_state_hash_after], - primaryjoin="AccountState.hash == Transaction.account_state_hash_after", + primaryjoin="AccountState.hash == Transaction.account_state_hash_after", viewonly=True) - account_state_latest = relationship("LatestAccountState", + account_state_latest = relationship("LatestAccountState", foreign_keys=[account], primaryjoin="LatestAccountState.account == Transaction.account", lazy='selectin', viewonly=True) - description = Column(JSONB) - - messages: List["TransactionMessage"] = relationship("TransactionMessage", back_populates="transaction") - event: Optional["Event"] = relationship("Event", - foreign_keys=[event_id], - primaryjoin="Transaction.event_id == Event.id", - viewonly=True) - # event: Event = relationship("EventTransaction", back_populates="transactions") - + messages: List[Message] = relationship("Message", back_populates="transaction", viewonly=True) + trace: Optional[Trace] = relationship("Trace", foreign_keys=[trace_id], primaryjoin="Transaction.trace_id == Trace.trace_id", viewonly=True) + emulated: bool = False class AccountState(Base): __tablename__ = 'account_states' @@ -253,7 +390,11 @@ class AccountState(Base): class Message(Base): __tablename__ = 'messages' - hash: str = Column(String(44), primary_key=True) + msg_hash: str = Column(String(44), primary_key=True) + tx_hash: str = Column(String(44), ForeignKey("transactions.hash"), primary_key=True) + tx_lt: int = Column(BigInteger, primary_key=True) + direction = Column(Enum('out', 'in', name='msg_direction'), primary_key=True) + trace_id: str = Column(String(44)) source: str = Column(String) destination: str = Column(String) value: int = Column(BigInteger) @@ -269,49 +410,47 @@ class Message(Base): body_hash: str = Column(String(44)) init_state_hash: Optional[str] = Column(String(44), nullable=True) - transactions = relationship("TransactionMessage", - foreign_keys=[hash], - primaryjoin="TransactionMessage.message_hash == Message.hash", - uselist=True, - viewonly=True) - message_content = relationship("MessageContent", + transaction = relationship("Transaction", + viewonly=True, + back_populates="messages", + foreign_keys=[tx_hash], + primaryjoin="Message.tx_hash == Transaction.hash") + + message_content = relationship("MessageContent", foreign_keys=[body_hash], primaryjoin="Message.body_hash == MessageContent.hash", viewonly=True) - init_state = relationship("MessageContent", + init_state = relationship("MessageContent", foreign_keys=[init_state_hash], - primaryjoin="Message.init_state_hash == MessageContent.hash", + primaryjoin="Message.init_state_hash == MessageContent.hash", viewonly=True) - - source_account_state = relationship("LatestAccountState", + + source_account_state = relationship("LatestAccountState", foreign_keys=[source], - primaryjoin="Message.source == LatestAccountState.account", + primaryjoin="Message.source == LatestAccountState.account", lazy='selectin', viewonly=True) - destination_account_state = relationship("LatestAccountState", + destination_account_state = relationship("LatestAccountState", foreign_keys=[destination], - primaryjoin="Message.destination == LatestAccountState.account", + primaryjoin="Message.destination == LatestAccountState.account", lazy='selectin', viewonly=True) + def __repr__(self): + opcode = self.opcode + if opcode is not None: + if opcode > 0: + opcode = hex(opcode) + else: + opcode = hex(opcode & 0xffffffff) -class TransactionMessage(Base): - __tablename__ = 'transaction_messages' - transaction_hash: str = Column(String(44), ForeignKey('transactions.hash'), primary_key=True) - message_hash: str = Column(String(44), primary_key=True) - direction: str = Column(Enum('in', 'out', name="direction"), primary_key=True) - - transaction: "Transaction" = relationship("Transaction", back_populates="messages") - # message = relationship("Message", back_populates="transactions") - message: "Message" = relationship("Message", foreign_keys=[message_hash], - primaryjoin="TransactionMessage.message_hash == Message.hash", - viewonly=True) + return f"Message({self.direction}, {self.msg_hash}, {opcode})" class MessageContent(Base): __tablename__ = 'message_contents' - + hash: str = Column(String(44), primary_key=True) body: str = Column(String) @@ -336,7 +475,7 @@ class JettonWallet(Base): foreign_keys=[address], primaryjoin="JettonWallet.address == JettonBurn.jetton_wallet_address", viewonly=True) - + jetton_master: "JettonMaster" = relationship("JettonMaster", foreign_keys=[jetton], primaryjoin="JettonWallet.jetton == JettonMaster.address") @@ -369,6 +508,7 @@ class JettonTransfer(Base): custom_payload = Column(String) forward_ton_amount: int = Column(Numeric) forward_payload = Column(String) + trace_id = Column(String(44)) transaction: Transaction = relationship("Transaction") jetton_wallet: JettonWallet = relationship("JettonWallet", @@ -421,10 +561,10 @@ class NFTItem(Base): code_hash = Column(String) data_hash = Column(String) - collection: Optional[NFTCollection] = relationship('NFTCollection', + collection: Optional[NFTCollection] = relationship('NFTCollection', foreign_keys=[collection_address], primaryjoin="NFTCollection.address == NFTItem.collection_address",) - + transfers: List["NFTTransfer"] = relationship('NFTTransfer', foreign_keys=[address], primaryjoin="NFTItem.address == NFTTransfer.nft_item_address",) @@ -447,6 +587,28 @@ class NFTTransfer(Base): foreign_keys=[nft_item_address], primaryjoin="NFTItem.address == NFTTransfer.nft_item_address",) + +class NftSale(Base): + __tablename__ = 'getgems_nft_sales' + + address = Column(String, primary_key=True) + is_complete = Column(Boolean) + created_at = Column(BigInteger) + marketplace_address = Column(String) + nft_address = Column(String) + nft_owner_address = Column(String) + full_price = Column(Numeric) + + +class NftAuction(Base): + __tablename__ = 'getgems_nft_auctions' + + address = Column(String, primary_key=True) + nft_addr = Column(String) + nft_owner = Column(String) + + + class LatestAccountState(Base): __tablename__ = 'latest_account_states' account = Column(String, primary_key=True) @@ -471,7 +633,6 @@ class LatestAccountState(Base): Index("transactions_index_2a", Transaction.account, Transaction.now) Index("transactions_index_3", Transaction.now, Transaction.hash) Index("transactions_index_4", Transaction.lt, Transaction.hash) -Index("transactions_index_6", Transaction.event_id) Index("transactions_index_8", Transaction.mc_block_seqno) # Index('account_states_index_1', AccountState.hash) @@ -485,10 +646,8 @@ class LatestAccountState(Base): # Index("messages_index_6", Message.body_hash) # Index("messages_index_7", Message.init_state_hash) -# Index("transaction_messages_index_1", TransactionMessage.transaction_hash) -Index("transaction_messages_index_2", TransactionMessage.message_hash) - -# Index("message_contents_index_1", MessageContent.hash) +# Index("transaction_messages_index_1", TransactionMessage.transaction_hash, postgresql_using='btree', postgresql_concurrently=False) +# Index("message_contents_index_1", MessageContent.hash, postgresql_using='btree', postgresql_concurrently=False) # Index("jetton_wallets_index_1", JettonWallet.address) Index("jetton_wallets_index_2", JettonWallet.owner) @@ -523,8 +682,3 @@ class LatestAccountState(Base): Index("nft_transfers_index_2", NFTTransfer.nft_item_address) Index("nft_transfers_index_3", NFTTransfer.old_owner) Index("nft_transfers_index_4", NFTTransfer.new_owner) - - -# # event indexes -# Index("event_transaction_index_1", EventTransaction.tx_hash) -Index("event_detector__transaction_index_1", Transaction.lt.asc(), postgresql_where=(Transaction.event_id.is_(None))) diff --git a/indexer/indexer/core/redis.py b/indexer/indexer/core/redis.py new file mode 100644 index 0000000..5ba834d --- /dev/null +++ b/indexer/indexer/core/redis.py @@ -0,0 +1,30 @@ +import redis.asyncio as aioredis +import redis + +from indexer.core.settings import Settings +import logging +logger = logging.getLogger(__name__) + +settings = Settings() + + +def create_redis_client(): + dsn = settings.redis_dsn + if dsn.startswith("tcp"): + dsn = "redis" + dsn[3:] + pool = aioredis.ConnectionPool.from_url(dsn, max_connections=800) + client = aioredis.Redis(connection_pool=pool, max_connections=800) + logger.info(f"redis: {client}") + return client + +def create_sync_redis_client(): + dsn = settings.redis_dsn + if dsn.startswith("tcp"): + dsn = "redis" + dsn[3:] + pool = redis.ConnectionPool.from_url(dsn, max_connections=800) + client = redis.Redis(connection_pool=pool, max_connections=800) + logger.info(f"sync redis: {client}") + return client + +client = create_redis_client() +sync_client = create_sync_redis_client() diff --git a/indexer/indexer/core/settings.py b/indexer/indexer/core/settings.py index f7c8aaa..cba89e7 100644 --- a/indexer/indexer/core/settings.py +++ b/indexer/indexer/core/settings.py @@ -3,9 +3,11 @@ class Settings(BaseSettings): pg_dsn: str = 'postgresql+asyncpg://localhost:5432/ton_index_a' + redis_dsn: str = '' api_root_path: str = '' api_title: str = '' ton_http_api_endpoint: str = '' + emulated_traces: bool = False is_testnet: bool = False class Config: diff --git a/indexer/indexer/events/__init__.py b/indexer/indexer/events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/indexer/indexer/events/blocks/__init__.py b/indexer/indexer/events/blocks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/indexer/indexer/events/blocks/auction.py b/indexer/indexer/events/blocks/auction.py new file mode 100644 index 0000000..1b8a228 --- /dev/null +++ b/indexer/indexer/events/blocks/auction.py @@ -0,0 +1,55 @@ +from indexer.events.blocks.utils import AccountId, Amount +from indexer.events import context +from indexer.events.blocks.basic_matchers import BlockMatcher +from indexer.events.blocks.basic_blocks import Block, TonTransferBlock + + +class AuctionBid(Block): + def __init__(self, data): + super().__init__('auction_bid', [], data) + + def __repr__(self): + return f"Auction bid {self.event_nodes[0].message.transaction.hash}" + + +def _is_teleitem(data: dict): + if 'content' not in data: + return False + if 'uri' in data['content'] and 'https://nft.fragment.com' in data['content']['uri']: + return True + return False + + +class AuctionBidMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=None, include_excess=False) + + def test_self(self, block: Block): + return isinstance(block, TonTransferBlock) + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + interfaces = await context.interface_repository.get().get_interfaces(block.event_nodes[0].message.destination) + if interfaces is None: + return [] + + bid_block = AuctionBid({}) + + if 'NftAuction' in interfaces: + bid_block.data = { + 'amount': Amount(block.event_nodes[0].message.value), + 'bidder': AccountId(block.event_nodes[0].message.source), + 'auction': AccountId(block.event_nodes[0].message.destination), + 'nft_address': AccountId(interfaces['NftAuction']['nft_addr']), + } + elif 'NFTItem' in interfaces and _is_teleitem(interfaces['NFTItem']): + bid_block.data = { + 'amount': Amount(block.event_nodes[0].message.value), + 'bidder': AccountId(block.event_nodes[0].message.source), + 'auction': AccountId(block.event_nodes[0].message.destination), + 'nft_address': AccountId(block.event_nodes[0].message.destination), + } + else: + return [] + bid_block.merge_blocks([block]) + return [block] + diff --git a/indexer/indexer/events/blocks/basic_blocks.py b/indexer/indexer/events/blocks/basic_blocks.py new file mode 100644 index 0000000..d779cab --- /dev/null +++ b/indexer/indexer/events/blocks/basic_blocks.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import base64 + +from pytoniq_core import Slice + +from indexer.core.database import Message +from indexer.events.blocks.messages import TonTransferMessage +from indexer.events.blocks.core import Block, AccountValueFlow +from indexer.events.blocks.utils import AccountId, Amount +from indexer.events.blocks.utils.tree_utils import EventNode + + +def _fill_flow_from_node(flow: AccountValueFlow, node: EventNode): + if node.message.value is not None: + assert node.message.direction == "in" + flow.add_ton(AccountId(node.message.destination), node.message.value) + flow.add_ton(AccountId(node.message.source), -node.message.value) + flow.add_fees(AccountId(node.message.destination), node.message.transaction.total_fees) + elif node.message.direction == "in": + flow.add_fees(AccountId(node.message.destination), node.message.transaction.total_fees) + + for msg in node.message.transaction.messages: + if msg.fwd_fee is not None and msg.direction == "out": + flow.add_fees(AccountId(msg.source), msg.fwd_fee) + + +class TonTransferBlock(Block): + value: int + comment: str | None + encrypted: bool + + def __init__(self, node: EventNode): + msg = TonTransferMessage(Slice.one_from_boc(node.message.message_content.body)) + self.encrypted = msg.encrypted + self.comment_encoded = False + if msg.comment is not None: + if self.encrypted: + self.comment_encoded = True + self.comment = str(base64.b64encode(msg.comment), encoding='utf-8') + else: + try: + self.comment = str(msg.comment, encoding='utf-8') + except Exception: + self.comment_encoded = True + self.comment = str(base64.b64encode(msg.comment), encoding='utf-8') + else: + self.comment = None + + super().__init__('ton_transfer', [node], { + 'source': AccountId(node.message.source) if node.message.source is not None else None, + 'destination': AccountId( + node.message.destination) if node.message.destination is not None else None, + 'value': Amount(node.message.value), + 'comment': self.comment, + 'encrypted': self.encrypted, + }) + self.failed = node.failed + self.value = node.message.value + + _fill_flow_from_node(self.value_flow, node) + + +class ContractDeploy(Block): + def __init__(self, node: EventNode): + super().__init__('contract_deploy', [], node.message.transaction.account) + self.failed = node.failed + + +class CallContractBlock(Block): + opcode: int + + def __init__(self, node: EventNode): + super().__init__('call_contract', [node], { + 'opcode': node.get_opcode(), + 'source': AccountId(node.message.source) if node.message.source is not None else None, + 'destination': AccountId( + node.message.destination) if node.message.destination is not None else None, + 'value': Amount(node.message.value), + }) + self.failed = node.failed + self.is_external = node.message.source is None + self.opcode = node.get_opcode() + _fill_flow_from_node(self.value_flow, node) + + def get_body(self) -> Slice: + return Slice.one_from_boc(self.event_nodes[0].message.message_content.body) + + def get_message(self) -> Message: + return self.event_nodes[0].message + + def __repr__(self): + return f"!{self.btype}:={hex(self.opcode)}" diff --git a/indexer/indexer/events/blocks/basic_matchers.py b/indexer/indexer/events/blocks/basic_matchers.py new file mode 100644 index 0000000..64b89c1 --- /dev/null +++ b/indexer/indexer/events/blocks/basic_matchers.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import logging + +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.core import Block +from indexer.events.blocks.messages import ExcessMessage + +logger = logging.getLogger(__name__) + + +class BlockMatcher: + + def __init__(self, child_matcher: BlockMatcher | None = None, + parent_matcher: BlockMatcher | None = None, + optional=False, + children_matchers=None, + include_excess=True): + self.child_matcher = child_matcher + self.children_matchers = children_matchers + self.parent_matcher = parent_matcher + self.optional = optional + self.include_excess = include_excess + + def test_self(self, block: Block): + return True + + async def try_build(self, block: Block) -> list[Block] | None: + child_matched = True + parent_matched = True + self_matched = self.test_self(block) + if not self_matched: + return None + blocks = [] + child_matched = await self.process_child_matcher(block, blocks, child_matched) + parent_matched = await self.process_parent_matcher(block, blocks, parent_matched) + if self_matched and parent_matched and child_matched: + try: + r = await self.build_block(block, blocks) + if self.include_excess: + for next_block in block.next_blocks: + if isinstance(next_block, CallContractBlock) and next_block.opcode == ExcessMessage.opcode: + r.append(next_block) + return r + except Exception as e: + logger.error(f"Error while building block {block} with matcher {self.__class__.__name__}: {e}. Trace id: {block.event_nodes[0].message.trace_id}") + return None + else: + return None + + async def process_parent_matcher(self, block, blocks, parent_matched): + if self.parent_matcher is not None: + matcher_parent_blocks = await self.parent_matcher.try_build(block.previous_block) + if matcher_parent_blocks is not None: + parent_matched = True + blocks.extend(matcher_parent_blocks) + else: + parent_matched = self.parent_matcher.optional + return parent_matched + + async def process_child_matcher(self, block, blocks, child_matched): + if self.child_matcher is not None: + for child in block.next_blocks: + r = await self.child_matcher.try_build(child) + if r is not None: + blocks.extend(r) + child_matched = self.child_matcher.optional or len(blocks) > 0 + if self.children_matchers is not None: + r = await self.process_children_matchers(block, blocks, child_matched) + if r is not None: + blocks.extend(r) + else: + child_matched = False + return child_matched + + async def process_children_matchers(self, block, blocks, child): + next_blocks = block.next_blocks.copy() + remaining_matchers = self.children_matchers.copy() + blocks = [] + while len(remaining_matchers) > 0: + matcher = remaining_matchers[0] + matched = False + + for next_block in next_blocks: + res = await matcher.try_build(next_block) + if res is not None: + blocks.extend(res) + remaining_matchers.pop(0) + next_blocks.remove(next_block) + matched = True + break + if not matched: + if matcher.optional: + remaining_matchers.pop(0) + else: + return None + + if len(remaining_matchers) == 0: + return blocks + if all(m.optional for m in remaining_matchers): + return blocks + else: + return None + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + return [block] + other_blocks + + +class OrMatcher(BlockMatcher): + def __init__(self, matchers: list[BlockMatcher], optional=False): + super().__init__(child_matcher=None, parent_matcher=None, optional=optional) + self.matchers = matchers + + def test_self(self, block: Block): + return any(m.test_self(block) for m in self.matchers) + + async def try_build(self, block: Block) -> list[Block] | None: + for m in self.matchers: + res = await m.try_build(block) + if res is not None: + return res + return None + + +class TonTransferMatcher(BlockMatcher): + + def __init__(self): + super().__init__(child_matcher=None, parent_matcher=None) + + def test_self(self, block: Block): + return block.btype == "ton_transfer" + + +class ContractMatcher(BlockMatcher): + def __init__(self, opcode, + child_matcher=None, + parent_matcher=None, + optional=False, + children_matchers=None, + include_excess=True): + super().__init__(child_matcher, parent_matcher, optional, children_matchers, include_excess) + self.opcode = opcode + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == self.opcode + + +class BlockTypeMatcher(BlockMatcher): + def __init__(self, block_type, child_matcher=None, parent_matcher=None, optional=False): + super().__init__(child_matcher, parent_matcher, optional) + self.block_type = block_type + + def test_self(self, block: Block): + return block.btype == self.block_type + + +def child_sequence_matcher(matchers: list[BlockMatcher]) -> BlockMatcher | None: + if len(matchers) == 0: + return None + if len(matchers) == 1: + return matchers[0] + + root_matcher = matchers[0] + current_matcher = matchers[0] + for matcher in matchers[1:]: + current_matcher.child_matcher = matcher + current_matcher = matcher + return root_matcher diff --git a/indexer/indexer/events/blocks/core.py b/indexer/indexer/events/blocks/core.py new file mode 100644 index 0000000..d08d8a2 --- /dev/null +++ b/indexer/indexer/events/blocks/core.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +from typing import Callable, Iterable + +from indexer.events.blocks.utils import AccountId +from indexer.events.blocks.utils.tree_utils import EventNode + + +def _get_direction_for_block(block: Block) -> int: + if len(block.event_nodes) == 1: + return 1 if block.event_nodes[0].message.direction == "out" else 0 + return 2 + + +def _ensure_earliest_common_block(blocks: list[Block]) -> Block | None: + """Ensures that all blocks have a common block in their previous blocks, and returns it.""" + # find block with min min_lt + blocks = sorted(blocks, key=lambda b: (b.min_lt, _get_direction_for_block(b))) + earliest_node_in_block = blocks[0] + connected = [earliest_node_in_block] + for block in blocks: + if block in connected: + continue + else: + if block.previous_block is not None: + if block.previous_block in connected: + connected.append(block) + continue + else: + return None + return earliest_node_in_block + + +class AccountFlow: + ton: int + fees: int + jettons: dict[AccountId, int] + + def __init__(self): + self.ton = 0 + self.fees = 0 + self.jettons = {} + + def merge(self, other: AccountFlow): + self.ton += other.ton + self.fees += other.fees + for jetton, amount in other.jettons.items(): + if jetton not in self.jettons: + self.jettons[jetton] = 0 + self.jettons[jetton] += amount + + def to_dict(self): + return { + 'ton': str(self.ton), + 'fees': str(self.fees), + 'jettons': {str(jetton): str(amount) for jetton, amount in self.jettons.items()} + } + + +class AccountValueFlow: + flow: dict[AccountId, AccountFlow] + + def __init__(self): + self.flow = {} + + def to_dict(self): + return { + 'flow': {str(account): flow.to_dict() for account, flow in self.flow.items()} + } + + def add_ton(self, account: AccountId, amount: int): + if account not in self.flow: + self.flow[account] = AccountFlow() + self.flow[account].ton += amount + + def add_jetton(self, account: AccountId, jetton: AccountId, amount: int): + if account not in self.flow: + self.flow[account] = AccountFlow() + if jetton not in self.flow[account].jettons: + self.flow[account].jettons[jetton] = 0 + self.flow[account].jettons[jetton] += amount + + def add_fees(self, account: AccountId, amount: int): + if account not in self.flow: + self.flow[account] = AccountFlow() + self.flow[account].fees += amount + + def merge(self, other: AccountValueFlow): + for account, flow in other.flow.items(): + if account not in self.flow: + self.flow[account] = AccountFlow() + self.flow[account].merge(flow) + + +class Block: + event_nodes: list[EventNode] + children_blocks: list[Block] + next_blocks: list[Block] + contract_deployments: set[AccountId] + failed: bool + previous_block: Block + parent: Block + data: any + min_lt: int + min_utime: int + max_utime: int + max_lt: int + type: str + value_flow: AccountValueFlow + + def __init__(self, type: str, nodes: list[EventNode], v=None): + self.failed = False + self.broken = False + self.event_nodes = nodes + self.children_blocks = [] + self.next_blocks = [] + self.previous_block = None + self.parent = None + self.btype = type + self.data = v + self.contract_deployments = set() + self.value_flow = AccountValueFlow() + if len(nodes) != 0: + self.min_lt = nodes[0].message.transaction.lt + self.max_lt = nodes[0].message.transaction.lt + self.min_utime = nodes[0].message.transaction.now + self.max_utime = nodes[0].message.transaction.now + self._find_contract_deployments() + + else: + self.min_lt = 0 + self.max_lt = 0 + self.min_utime = 0 + self.max_utime = 0 + + def calculate_min_max_lt(self): + self.min_lt = min(n.message.transaction.lt for n in self.event_nodes) + self.min_utime = min(n.message.transaction.now for n in self.event_nodes) + self.max_lt = max(n.message.transaction.lt for n in self.event_nodes) + self.max_utime = max(n.message.transaction.now for n in self.event_nodes) + + def iter_prev(self, predicate: Callable[[Block], bool]) -> Iterable[Block]: + """Iterates over all previous blocks that match predicate, starting from the closest one.""" + r = self.previous_block + while r is not None: + if predicate(r): + yield r + r = r.previous_block + else: + return + return + + def any_parent(self, predicate: Callable[[Block], bool]) -> bool: + """Checks if any of the previous blocks matches predicate.""" + r = self.previous_block + while r is not None: + if filter(r): + return True + else: + r = r.previous_block + return False + + def merge_blocks(self, blocks: list[Block]): + # blocks_to_merge = [] + # for block in blocks: + # if isinstance(block, SingleLevelWrapper): + # blocks_to_merge.extend(block.children_blocks) + # else: + # blocks_to_merge.append(block) + """Merges all blocks into one. Preserves structure""" + blocks_to_merge = list(set(blocks)) + earliest_block = _ensure_earliest_common_block(blocks_to_merge) + if earliest_block is None: + raise "Earliest common block not found" + for block in blocks_to_merge: + block.parent = self + self.event_nodes.extend(block.event_nodes) + self.children_blocks.append(block) + self.value_flow.merge(block.value_flow) + for block in earliest_block.find_next(lambda b, d: b in blocks_to_merge, stop_on_filter_unmatch=True, + yield_on_unmatch=True): + self.next_blocks.append(block) + self.contract_deployments = self.contract_deployments.union(block.contract_deployments) + block.previous_block = self + self.previous_block = earliest_block.previous_block + if earliest_block.previous_block is not None: + earliest_block.previous_block.compact_connections() + self.calculate_min_max_lt() + + def find_next(self, + node_filter: Callable[['Block', int], bool] = None, + max_depth=-1, + stop_on_filter_unmatch: bool = False, + yield_on_unmatch: bool = False) -> Iterable['Block']: + """Iterates over all next blocks that match predicate, starting from the closest one.""" + queue = list([(c, 0) for c in self.next_blocks]) + while len(queue) > 0: + node, depth = queue.pop(0) + filter_matched = node_filter is None or node_filter(node, depth) + if filter_matched and not yield_on_unmatch: + yield node + elif not filter_matched and yield_on_unmatch: + yield node + should_extend_queue = (depth + 1 <= max_depth or max_depth < 0) + if stop_on_filter_unmatch: + should_extend_queue = should_extend_queue and filter_matched + if should_extend_queue: + queue.extend([(c, depth + 1) for c in node.next_blocks]) + + def connect(self, other: 'Block'): + self.next_blocks.append(other) + other.previous_block = self + + def topmost_parent(self): + if self.parent is None: + return self + else: + return self.parent.topmost_parent() + + def compact_connections(self): + self.next_blocks = list( + set(n.topmost_parent() for n in self.next_blocks if n not in self.children_blocks and n != self)) + + def set_prev(self, prev: 'Block'): + self.previous_block = prev + + def __repr__(self): + return f"!{self.btype}:={self.data}" + + def bfs_iter(self): + queue = [self] + while len(queue) > 0: + cur = queue.pop(0) + yield cur + queue.extend(cur.next_blocks) + + def dict(self): + return { + "btype": self.btype, + "nodes": [n.data for n in self.event_nodes], + "children": [c.dict() for c in self.children_blocks], + "next": [n.dict() for n in self.next_blocks], + "value": self.data + } + + def calculate_progress(self): + total = 0 + total_emulated = 0 + for node in self.event_nodes: + if node.message.destination is not None and node.message.source is not None: + total += 1 + if node.message.transaction.emulated: + total_emulated += 1 + return (1.0 - total_emulated / total) if total != 0 else 0 + + def _find_contract_deployments(self): + for node in self.event_nodes: + if node.message.transaction.orig_status != 'active' and node.message.transaction.end_status == 'active': + self.contract_deployments.add(AccountId(node.message.transaction.account)) + +class SingleLevelWrapper(Block): + def __init__(self): + super().__init__('single_wrapper', [], None) + + def wrap(self, blocks: list[Block]): + block_queue = blocks.copy() + nodes = [] + while len(block_queue) > 0: + block = block_queue.pop(0) + if isinstance(block, SingleLevelWrapper): + block_queue.extend(block.children_blocks) + continue + self.children_blocks.append(block) + for next_block in block.next_blocks: + if next_block not in blocks: + self.next_blocks.append(next_block) + nodes.extend(block.event_nodes) + if block.previous_block not in blocks: + self.previous_block = block.previous_block + self.event_nodes = list(set(nodes)) + + self.compact_connections() + self.calculate_min_max_lt() diff --git a/indexer/indexer/events/blocks/dns.py b/indexer/indexer/events/blocks/dns.py new file mode 100644 index 0000000..7e35edb --- /dev/null +++ b/indexer/indexer/events/blocks/dns.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from pytoniq_core import Slice + +from indexer.events.blocks.messages.dns import ChangeDnsRecordMessage +from indexer.events.blocks.utils import AccountId +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, ContractMatcher +from indexer.events.blocks.core import Block + + +class DeleteDnsRecordBlock(Block): + def __init__(self, data): + super().__init__('delete_dns', [], data) + + def __repr__(self): + return f"DELETE_DNS {self.event_nodes[0].message.transaction.hash}" + + +class ChangeDnsRecordBlock(Block): + def __init__(self, data): + super().__init__('change_dns', [], data) + + def __repr__(self): + return f"CHANGE_DNS {self.event_nodes[0].message.transaction.hash}" + + +class ChangeDnsRecordMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=ContractMatcher(opcode=0xffffffff, + optional=True, + include_excess=False)) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == ChangeDnsRecordMessage.opcode + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + change_dns_message = ChangeDnsRecordMessage(Slice.one_from_boc(block.event_nodes[0].message.message_content.body)) + new_block = None + sender = block.event_nodes[0].message.source + + if change_dns_message.has_value: + new_block = ChangeDnsRecordBlock({ + 'source': AccountId(sender) if sender is not None else None, + 'destination': AccountId(block.event_nodes[0].message.destination), + 'key': change_dns_message.key, + 'value': change_dns_message.value, + }) + else: + new_block = DeleteDnsRecordBlock({ + 'source': AccountId(sender) if sender is not None else None, + 'destination': AccountId(block.event_nodes[0].message.destination), + 'key': change_dns_message.key, + }) + new_block.failed = block.failed + new_block.merge_blocks([block] + other_blocks) + return [new_block] diff --git a/indexer/indexer/events/blocks/elections.py b/indexer/indexer/events/blocks/elections.py new file mode 100644 index 0000000..50d893c --- /dev/null +++ b/indexer/indexer/events/blocks/elections.py @@ -0,0 +1,70 @@ +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, ContractMatcher, OrMatcher +from indexer.events.blocks.core import Block +from indexer.events.blocks.utils import Amount, AccountId +from indexer.events.blocks.utils.block_utils import find_call_contract + + +class ElectionDepositStakeBlock(Block): + def __init__(self, data): + super().__init__('election_deposit', [], data) + + def __repr__(self): + return f"ELECTION_DEPOSIT {self.event_nodes[0].message.transaction.hash}" + + +class ElectionRecoverStakeBlock(Block): + def __init__(self, data): + super().__init__('election_recover', [], data) + + def __repr__(self): + return f"ELECTION_RECOVER {self.event_nodes[0].message.transaction.hash}" + + +class ElectionDepositStakeBlockMatcher(BlockMatcher): + def __init__(self): + confirmation_matcher = ContractMatcher(opcode=0xf374484c, optional=True, include_excess=False) + super().__init__(child_matcher=OrMatcher([ + ContractMatcher(opcode=0x4e73744b, optional=True, include_excess=False, child_matcher=confirmation_matcher), + confirmation_matcher]), include_excess=False) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == 0x4e73744b + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + data = { + 'amount': Amount(block.event_nodes[0].message.value), + 'stake_holder': AccountId(block.event_nodes[0].message.source), + } + new_block = ElectionDepositStakeBlock(data) + new_block.failed = find_call_contract(other_blocks, 0xf374484c) is None + new_block.merge_blocks([block] + other_blocks) + return [new_block] + + +class ElectionRecoverStakeBlockMatcher(BlockMatcher): + def __init__(self): + confirmation_matcher = ContractMatcher(opcode=0xf96f7324, + optional=True, + include_excess=False) + super().__init__(child_matcher=OrMatcher([ + ContractMatcher(opcode=0x47657424, optional=True, include_excess=False, child_matcher=confirmation_matcher), + confirmation_matcher]), include_excess=False) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == 0x47657424 + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + data = { + 'stake_holder': AccountId(block.event_nodes[0].message.source) + } + response = find_call_contract(other_blocks, 0xf96f7324) + failed = False + if response is not None: + data['amount'] = Amount(response.event_nodes[0].message.value) + else: + failed = True + new_block = ElectionRecoverStakeBlock(data) + new_block.failed = failed + new_block.merge_blocks([block] + other_blocks) + return [new_block] diff --git a/indexer/indexer/events/blocks/jettons.py b/indexer/indexer/events/blocks/jettons.py new file mode 100644 index 0000000..22df95f --- /dev/null +++ b/indexer/indexer/events/blocks/jettons.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import base64 + +from indexer.events.blocks.utils.block_utils import find_call_contract +from indexer.events.blocks.messages import JettonNotify, JettonInternalTransfer, ExcessMessage, JettonBurnNotification +from indexer.events import context +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, OrMatcher, ContractMatcher, child_sequence_matcher +from indexer.events.blocks.core import Block +from indexer.events.blocks.messages import JettonTransfer, JettonBurn +from indexer.events.blocks.utils import AccountId, Asset, Amount + + +class JettonTransferBlock(Block): + def __init__(self, data): + super().__init__('jetton_transfer', [], data) + + def __repr__(self): + return f"JETTON TRANSFER {self.event_nodes[0].message.transaction.hash}" + + +class JettonBurnBlock(Block): + def __init__(self): + super().__init__('jetton_burn', [], {}) + + def __repr__(self): + return f"JETTON BURN {self.data}" + + +class JettonTransferBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=OrMatcher([ + ContractMatcher(opcode=JettonNotify.opcode, optional=True), + ContractMatcher(opcode=JettonInternalTransfer.opcode, optional=True, + child_matcher=ContractMatcher(opcode=JettonNotify.opcode, optional=True)) + ], optional=True), parent_matcher=None) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == JettonTransfer.opcode + + async def build_block(self, block: Block | CallContractBlock, other_blocks: list[Block]) -> list[Block]: + new_block = JettonTransferBlock({}) + include = [block] + include.extend(other_blocks) + jetton_transfer_message = JettonTransfer(block.get_body()) + receiver: AccountId = AccountId(jetton_transfer_message.destination) + has_internal_transfer = find_call_contract(other_blocks, JettonInternalTransfer.opcode) is not None + data = { + 'has_internal_transfer': has_internal_transfer, + 'sender': None, + 'sender_wallet': AccountId(block.event_nodes[0].message.destination), + 'receiver': receiver, + 'response_address': AccountId(jetton_transfer_message.response), + 'forward_amount': Amount(jetton_transfer_message.forward_amount), + 'query_id': jetton_transfer_message.query_id, + 'asset': None, + 'amount': Amount(jetton_transfer_message.amount), + 'forward_payload': base64.b64encode(jetton_transfer_message.forward_payload).decode( + 'utf-8') if jetton_transfer_message.forward_payload is not None else None, + 'custom_payload': base64.b64encode(jetton_transfer_message.custom_payload).decode( + 'utf-8') if jetton_transfer_message.custom_payload is not None else None, + 'comment': jetton_transfer_message.comment, + 'encrypted_comment': jetton_transfer_message.encrypted_comment, + 'payload_opcode': jetton_transfer_message.payload_sum_type, + 'stonfi_swap_body': jetton_transfer_message.stonfi_swap_body + } + if len(block.next_blocks) > 0: + data['receiver_wallet'] = AccountId(block.next_blocks[0].event_nodes[0].message.destination) + sender = await context.interface_repository.get().get_jetton_wallet( + block.event_nodes[0].message.destination) + if sender is not None: + data['sender'] = AccountId(sender.owner) if sender is not None else None + data['asset'] = Asset(is_ton=False, jetton_address=sender.jetton if sender is not None else None) + new_block.value_flow.add_jetton(AccountId(sender.owner), AccountId(sender.jetton), + -jetton_transfer_message.amount) + new_block.value_flow.add_jetton(receiver, AccountId(sender.jetton), + jetton_transfer_message.amount) + else: + return [] + + new_block.data = data + new_block.merge_blocks(include) + new_block.failed = block.failed + return [new_block] + + +async def _get_jetton_burn_data(new_block: Block, block: Block | CallContractBlock) -> dict: + jetton_burn_message = JettonBurn(block.get_body()) + wallet = await context.interface_repository.get().get_jetton_wallet(block.get_message().destination) + assert wallet is not None + new_block.value_flow.add_jetton(AccountId(wallet.owner), AccountId(wallet.jetton), -jetton_burn_message.amount) + data = { + 'owner': AccountId(wallet.owner) if wallet is not None else None, + 'jetton_wallet': AccountId(block.get_message().destination), + 'amount': Amount(jetton_burn_message.amount), + 'asset': Asset(is_ton=False, jetton_address=wallet.jetton if wallet is not None else None) + } + return data + + +class JettonBurnBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=ContractMatcher(opcode=JettonBurnNotification.opcode, optional=True, + include_excess=True)) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == JettonBurn.opcode + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + new_block = JettonBurnBlock() + include = [block] + include.extend(other_blocks) + new_block.data = await _get_jetton_burn_data(new_block, block) + new_block.merge_blocks(include) + return [new_block] diff --git a/indexer/indexer/events/blocks/messages/__init__.py b/indexer/indexer/events/blocks/messages/__init__.py new file mode 100644 index 0000000..38297cb --- /dev/null +++ b/indexer/indexer/events/blocks/messages/__init__.py @@ -0,0 +1,5 @@ + +from .swaps import * +from .common import * +from .jettons import * +from .nft import * \ No newline at end of file diff --git a/indexer/indexer/events/blocks/messages/common.py b/indexer/indexer/events/blocks/messages/common.py new file mode 100644 index 0000000..697ab5b --- /dev/null +++ b/indexer/indexer/events/blocks/messages/common.py @@ -0,0 +1,27 @@ +from pytoniq_core import Slice + + +class ExcessMessage: + opcode = 0xd53276db + + +class TonTransferMessage: + opcode = 0 + encrypted_opcode = 0x2167da4b + + def __init__(self, boc: Slice): + if boc.remaining_bits < 32: + self.encrypted = False + self.comment = None + return + op = boc.load_uint(32) # opcode + if op & 0xFFFFFFFF == TonTransferMessage.encrypted_opcode: + self.encrypted = True + else: + self.encrypted = False + self.comment = None + try: + if boc.remaining_bits >= 8 and not boc.remaining_bits % 8 and boc.remaining_refs in (0, 1): + self.comment = boc.load_snake_bytes() + except Exception: + pass diff --git a/indexer/indexer/events/blocks/messages/dns.py b/indexer/indexer/events/blocks/messages/dns.py new file mode 100644 index 0000000..4861403 --- /dev/null +++ b/indexer/indexer/events/blocks/messages/dns.py @@ -0,0 +1,59 @@ +from pytoniq_core import Slice + +from indexer.events.blocks.utils import AccountId + + +class ChangeDnsRecordMessage: + opcode = 0x4eb1f0f9 + + def __init__(self, boc: Slice): + op = boc.load_uint(32) # opcode + self.key = boc.load_bytes(32) + self.has_value = boc.remaining_refs > 0 + if self.has_value: + self._parse_value(boc.load_ref().to_slice()) + else: + self.value = None + + def _parse_value(self, value: Slice): + schema = value.load_uint(16) + if schema == 0xba93: + self.value = { + 'schema': 'DNSNextResolver', + 'address': AccountId(value.load_address()) + } + elif schema == 0xad01: + self.value = { + 'schema': 'DNSAdnlAddress', + 'address': value.load_bytes(32), + 'flags': value.load_uint(8) + } + elif schema == 0x9fd3: + self.value = { + 'schema': 'DNSSmcAddress', + 'address': AccountId(value.load_address()), + 'flags': value.load_uint(8) + } + elif schema == 0x7473: + self.value = { + 'schema': 'DNSStorageAddress', + 'address': value.load_bytes(32) + } + elif schema == 0x1eda: + dns_text = "" + chunks_count = value.load_uint(8) + value_slice = value + while chunks_count > 0: + length = value.load_uint(8) + dns_text += value.load_string(length) + chunks_count -= 1 + if chunks_count > 0: + value_slice = value_slice.load_ref() + self.value = { + 'schema': 'DNSText', + 'dns_text': dns_text + } + else: + self.value = { + 'schema': 'Unknown' + } diff --git a/indexer/indexer/events/blocks/messages/jettons.py b/indexer/indexer/events/blocks/messages/jettons.py new file mode 100644 index 0000000..7784c4b --- /dev/null +++ b/indexer/indexer/events/blocks/messages/jettons.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from pytoniq_core import Slice, Address, Cell + + +class JettonTransfer: + opcode = 0x0f8a7ea5 + query_id: int + amount: int + destination: Address + response: Address + custom_payload: bytes | None + forward_amount: int + comment: bytes | None + encrypted_comment: bool + forward_payload: bytes | None + stonfi_swap_body: dict | None + + def __init__(self, boc: Slice): + boc.load_uint(32) # opcode + self.query_id = boc.load_uint(64) + self.amount = boc.load_coins() + self.destination = boc.load_address() + self.response = boc.load_address() + custom_payload = boc.load_maybe_ref() + if custom_payload: + self.custom_payload = custom_payload.to_boc(hash_crc32=True) + else: + self.custom_payload = None + self.forward_amount = boc.load_coins() + self.comment = None + self.encrypted_comment = False + self.payload_sum_type = None + self.stonfi_swap_body = None + payload_slice = boc.load_ref().to_slice() if boc.load_bool() else boc.copy() + self._load_forward_payload(payload_slice) + + def _load_forward_payload(self, payload_slice: Slice): + if payload_slice.remaining_bits == 0: + self.forward_payload = None + return + else: + self.forward_payload = payload_slice.to_cell().to_boc(hash_crc32=True) + if payload_slice.remaining_bits < 32: + self.sum_type = "Unknown" + return + sum_type = payload_slice.load_uint(32) + self.payload_sum_type = hex(sum_type) + # noinspection PyBroadException + try: + if sum_type == 0: + self.sum_type = "TextComment" + self.comment = payload_slice.load_snake_bytes() + elif sum_type == 0x2167da4b: + self.sum_type = "EncryptedTextComment" + self.comment = payload_slice.load_snake_bytes() + self.encrypted_comment = True + elif sum_type == 0x25938561: + self.stonfi_swap_body = { + 'jetton_wallet': payload_slice.load_address(), + 'min_amount': payload_slice.load_coins(), + 'user_address': payload_slice.load_address() + } + else: + self.sum_type = "Unknown" + except Exception: + self.sum_type = "Unknown" + + +class JettonBurn: + opcode = 0x595f07bc + + query_id: int + amount: int + response_destination: Address + + def __init__(self, slice: Slice): + slice.load_uint(32) + self.query_id = slice.load_uint(64) + self.amount = slice.load_coins() + self.response_destination = slice.load_address() + + +class JettonBurnNotification: + opcode = 0x7bdd97de + + +class JettonInternalTransfer: + opcode = 0x178d4519 + + query_id: int + amount: int + from_address: Address + response_address: Address + forward_ton_amount: int + + def __init__(self, slice: Slice): + slice.load_uint(32) + self.query_id = slice.load_uint(64) + self.amount = slice.load_uint(16) + self.from_address = slice.load_address() + self.response_address = slice.load_address() + self.forward_ton_amount = slice.load_uint(16) + + +class JettonNotify: + opcode = 0x7362d09c diff --git a/indexer/indexer/events/blocks/messages/nft.py b/indexer/indexer/events/blocks/messages/nft.py new file mode 100644 index 0000000..5d9dc20 --- /dev/null +++ b/indexer/indexer/events/blocks/messages/nft.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from pytoniq_core import Slice, Address + + +class TeleitemBidInfo: + bid: int + bit_ts: int + + def __init__(self, slice: Slice): + self.bid = slice.load_coins() + self.bit_ts = slice.load_uint(32) + + +class NftPayload: + op: int | None + value: TeleitemBidInfo | None + raw: bytes + + def __init__(self, slice: Slice): + self.value = None + self.op = None + self.raw = slice.to_cell().to_boc(hash_crc32=True) + if slice.remaining_bits == 0 and slice.remaining_refs == 0: + return + tmp_cell = slice.copy() + try: + self.op = tmp_cell.load_uint(32) & 0xFFFFFFFF + except: + return + if self.op == 0x38127de1: + self.value = TeleitemBidInfo(tmp_cell) + + +class NftTransfer: + opcode = 0x5fcc3d14 + + def __init__(self, slice: Slice): + slice.load_uint(32) # opcode + self.query_id = slice.load_uint(64) + self.new_owner = slice.load_address() + self.response_destination = slice.load_address() + custom_payload = slice.load_maybe_ref() + if custom_payload: + self.custom_payload = custom_payload.to_boc(hash_crc32=True) + else: + self.custom_payload = None + self.forward_amount = slice.load_coins() + self.forward_payload = None + if slice.remaining_bits > 0: + is_right = slice.load_bool() + forward_payload = slice.load_ref() if is_right else slice.copy().to_cell() + self.forward_payload = forward_payload.to_boc(hash_crc32=True) + + +class NftOwnershipAssigned: + opcode = 0x05138d91 + + query_id: int + prev_owner: Address + nft_payload: NftPayload | None + + def __init__(self, slice: Slice): + slice.load_uint(32) + self.query_id = slice.load_uint(64) + self.prev_owner = slice.load_address() + try: + if slice.load_bit(): + self.nft_payload = NftPayload(slice.load_ref().to_slice()) + else: + self.nft_payload = NftPayload(slice) + except: + self.nft_payload = None + + +class AuctionFillUp: + opcode = 0x370fec51 diff --git a/indexer/indexer/events/blocks/messages/subscriptions.py b/indexer/indexer/events/blocks/messages/subscriptions.py new file mode 100644 index 0000000..e9dad88 --- /dev/null +++ b/indexer/indexer/events/blocks/messages/subscriptions.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from pytoniq_core import Slice, Address + + +class SubscriptionPaymentRequest: + opcode = 0x706c7567 + + def __init__(self, slice: Slice): + slice.load_uint(32) # opcode + self.query_id = slice.load_uint(64) + self.grams = slice.load_coins() + + +class SubscriptionPaymentRequestResponse: + opcode = 0xf06c7567 + + +class SubscriptionPayment: + opcode = 0x73756273 + + +class WalletPluginDestruct: + opcode = 0x64737472 diff --git a/indexer/indexer/events/blocks/messages/swaps.py b/indexer/indexer/events/blocks/messages/swaps.py new file mode 100644 index 0000000..17f83c0 --- /dev/null +++ b/indexer/indexer/events/blocks/messages/swaps.py @@ -0,0 +1,81 @@ +from pytoniq_core import Slice, Address + +from indexer.events.blocks.utils import Asset + + +class StonfiSwapMessage: + opcode = 0x25938561 + + def __init__(self, body: Slice): + body.load_uint(32) # opcode + self.query_id = body.load_uint(64) + self.from_user_address = body.load_address() + self.token_wallet = body.load_address() + self.amount = body.load_coins() + self.min_out = body.load_coins() + self.has_ref = body.load_bit() + ref = body.load_ref().to_slice() + self.from_real_user = ref.load_address() + self.ref_address = None + if self.has_ref: + self.ref_address = ref.load_address() + + +class StonfiPaymentRequest: + opcode = 0xf93bb43f + + def __init__(self, body: Slice): + body.load_uint(32) # opcode + self.query_id = body.load_uint(64) + self.owner = body.load_address() + self.exit_code = body.load_uint(32) + ref = body.load_ref().to_slice() + self.amount0_out = ref.load_coins() + self.token0_out = ref.load_address() + self.amount1_out = ref.load_coins() + self.token1_out = ref.load_address() + + +class DedustSwapNotification: + opcode = 0x9c610de3 + + def __init__(self, body: Slice): + body.load_uint(32) # opcode + self.asset_in = self.load_asset(body) + self.asset_out = self.load_asset(body) + self.amount_in = body.load_coins() + self.amount_out = body.load_coins() + ref = body.load_ref().to_slice() + self.sender_address = ref.load_address() + self.ref_address = ref.load_address() + self.reserve_0 = ref.load_coins() + self.reserve_1 = ref.load_coins() + + def load_asset(self, slice: Slice) -> Asset: + kind = slice.load_uint(4) + if kind == 0: + return Asset(True) + else: + wc = slice.load_uint(8) + account_id = slice.load_bytes(32) + return Asset(False, Address((wc, account_id))) + + +class DedustPayout: + opcode = 0x474f86cf + + +class DedustPayoutFromPool: + opcode = 0xad4eb6f5 + + +class DedustSwapPeer: + opcode = 0x72aca8aa + + +class DedustSwapExternal: + opcode = 0x61ee542d + + +class DedustSwap: + opcode = 0xea06185d diff --git a/indexer/indexer/events/blocks/nft.py b/indexer/indexer/events/blocks/nft.py new file mode 100644 index 0000000..ad25929 --- /dev/null +++ b/indexer/indexer/events/blocks/nft.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +import base64 + +from pytoniq_core import Slice + +from indexer.events import context +from indexer.events.blocks.utils.block_utils import find_messages +from indexer.core.database import SyncSessionMaker, NFTItem +from indexer.events.blocks.basic_blocks import CallContractBlock, TonTransferBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, OrMatcher, ContractMatcher +from indexer.events.blocks.core import Block +from indexer.events.blocks.messages import NftOwnershipAssigned, ExcessMessage +from indexer.events.blocks.messages.nft import NftTransfer, TeleitemBidInfo, AuctionFillUp +from indexer.events.blocks.utils import AccountId, Amount, block_utils +from indexer.events.blocks.utils.block_utils import find_call_contracts + + +class NftMintBlock(Block): + def __init__(self, data: dict): + super().__init__('nft_mint', [], data) + + +class NftTransferBlock(Block): + def __init__(self): + super().__init__('nft_transfer', [], None) + + +async def _get_nft_data(nft_address: AccountId): + data = { + "address": nft_address, + "index": None, + "collection": None, + "exists": False + } + nft = await context.interface_repository.get().get_nft_item(nft_address.as_str()) + if nft is not None: + data['index'] = nft.index + data['exists'] = True + if "uri" in nft.content and "https://nft.fragment.com" in nft.content["uri"]: + tokens = nft.content["uri"].split("/") + data["name"] = tokens[-1][:-5] + data["type"] = tokens[-2] + else: + data['meta'] = nft.content + if nft.collection_address is not None: + data['collection'] = { + 'address': AccountId(nft.collection_address), + } + return data + + +async def _try_get_nft_purchase_data(block: Block, owner: str) -> tuple[list[Block], float] | None: + prev_block = block.previous_block + event_node = block.previous_block.event_nodes[0] + if not isinstance(prev_block, TonTransferBlock) or event_node.message.source.upper() != owner.upper(): + return None + nft_sale = await context.interface_repository.get().get_nft_sale(event_node.message.transaction.account) + + price = 0 + block_to_include = [block.previous_block] + if nft_sale is not None: + return [block.previous_block], nft_sale.full_price + return None + + +class NftTransferBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=OrMatcher([ + ContractMatcher(opcode=NftOwnershipAssigned.opcode, optional=True), + ContractMatcher(opcode=ExcessMessage.opcode, optional=True) + ], optional=True), parent_matcher=None) + + def test_self(self, block: Block): + if isinstance(block, CallContractBlock) and block.opcode == NftTransfer.opcode: + return True + + async def build_block(self, block: Block, other_blocks: list['Block']): + new_block = NftTransferBlock() + include = [block] + data = dict() + data['is_purchase'] = False + nft_transfer_message = NftTransfer( + Slice.one_from_boc(block.event_nodes[0].message.message_content.body)) + ownership_assigned_message = find_messages(other_blocks, NftOwnershipAssigned) + if len(ownership_assigned_message) > 0: + nft_ownership_message = ownership_assigned_message[0][1] + data['prev_owner'] = AccountId(nft_ownership_message.prev_owner) + else: + data['prev_owner'] = AccountId(block.event_nodes[0].message.source) + data['query_id'] = nft_transfer_message.query_id + data['forward_amount'] = Amount(nft_transfer_message.forward_amount) + if nft_transfer_message.response_destination: + data['response_destination'] = AccountId(nft_transfer_message.response_destination) + else: + data['response_destination'] = None + data['custom_payload'] = base64.b64encode(nft_transfer_message.custom_payload).decode('utf-8') if ( + nft_transfer_message.custom_payload is not None) else None + data['forward_payload'] = base64.b64encode(nft_transfer_message.forward_payload).decode('utf-8') if ( + nft_transfer_message.forward_payload is not None) else None + data['new_owner'] = AccountId(nft_transfer_message.new_owner) + data['nft'] = await _get_nft_data(AccountId(block.event_nodes[0].message.transaction.account)) + if block.previous_block is not None and isinstance(block.previous_block, TonTransferBlock): + nft_purchase_data = await _try_get_nft_purchase_data(block, nft_transfer_message.new_owner.to_str(False)) + if nft_purchase_data is not None: + block_to_include, price = nft_purchase_data + data['is_purchase'] = True + data['price'] = Amount(price) + if isinstance(block.previous_block, TonTransferBlock): + include.append(block.previous_block) + + include.extend(other_blocks) + new_block.merge_blocks(include) + new_block.data = data + if not data['nft']['exists']: + new_block.broken = True + return [new_block] + + +class TelegramNftPurchaseBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=None, + parent_matcher=None) + + def test_self(self, block: Block): + if isinstance(block, CallContractBlock) and block.opcode == NftOwnershipAssigned.opcode: + return True + + async def build_block(self, block: Block, other_blocks: list['Block']): + assert isinstance(block, CallContractBlock) + new_block = NftTransferBlock() + include = [block] + data = dict() + data['is_purchase'] = False + message = block.get_message() + nft_ownership_message = NftOwnershipAssigned(Slice.one_from_boc(message.message_content.body)) + data['new_owner'] = AccountId(message.destination) + data['query_id'] = nft_ownership_message.query_id + data['forward_amount'] = None + data['response_destination'] = None + data['custom_payload'] = None + data['forward_payload'] = None + data['nft'] = await _get_nft_data(AccountId(block.get_message().source)) + payload = nft_ownership_message.nft_payload + if payload is not None: + data['forward_payload'] = base64.b64encode(payload.raw).decode('utf-8') + if payload is not None and isinstance(payload.value, TeleitemBidInfo): + data['is_purchase'] = True + data['price'] = Amount(payload.value.bid) + prev_block = block.previous_block + if (isinstance(prev_block, TonTransferBlock) or + (isinstance(prev_block, CallContractBlock) and prev_block.get_message().source is None)): + include.extend(find_call_contracts(prev_block.next_blocks, AuctionFillUp.opcode)) + include.append(prev_block) + + include.extend(other_blocks) + new_block.merge_blocks(include) + new_block.data = data + if not data['nft']['exists']: + new_block.broken = True + return [new_block] + + +class NftMintBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=None, + parent_matcher=None) + + def test_self(self, block: Block): + return len(block.contract_deployments) == 1 + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + address = next(iter(block.contract_deployments)).as_str() + nft_item = await context.interface_repository.get().get_nft_item(address) + if nft_item is None: + return [] + source = block.event_nodes[0].message.source + data = { + "source": AccountId(source) if source else None, + "address": AccountId(address), + "index": nft_item.index, + "collection": AccountId(nft_item.collection_address) if nft_item.collection_address else None, + } + new_block = NftMintBlock(data) + new_block.merge_blocks([block]) + return [new_block] \ No newline at end of file diff --git a/indexer/indexer/events/blocks/subscriptions.py b/indexer/indexer/events/blocks/subscriptions.py new file mode 100644 index 0000000..a1292aa --- /dev/null +++ b/indexer/indexer/events/blocks/subscriptions.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from indexer.events.blocks.messages import JettonNotify, JettonInternalTransfer, ExcessMessage, JettonBurnNotification +from indexer.events.blocks.messages.subscriptions import SubscriptionPaymentRequestResponse, SubscriptionPayment, \ + SubscriptionPaymentRequest, WalletPluginDestruct +from indexer.events.blocks.utils.block_utils import find_call_contracts, find_messages, find_call_contract +from indexer.events import context +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, OrMatcher, ContractMatcher, child_sequence_matcher + +from indexer.events.blocks.core import Block +from indexer.events.blocks.utils import AccountId, Asset, Amount +from indexer.events.blocks.messages import JettonTransfer, JettonBurn +from indexer.core.database import JettonWallet + + +class SubscriptionBlock(Block): + def __init__(self, data): + super().__init__('subscribe', [], data) + + def __repr__(self): + return f"SUBSCRIPTION {self.event_nodes[0].message.transaction.hash}" + + +class UnsubscribeBlock(Block): + def __init__(self, data): + super().__init__('unsubscribe', [], data) + + def __repr__(self): + return f"UNSUBSCRIBE {self.event_nodes[0].message.transaction.hash}" + + +class SubscriptionBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=ContractMatcher(opcode=SubscriptionPayment.opcode), + parent_matcher=ContractMatcher(opcode=SubscriptionPaymentRequest.opcode, optional=True)) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == SubscriptionPaymentRequestResponse.opcode + + async def build_block(self, block: Block | CallContractBlock, other_blocks: list[Block]) -> list[Block]: + new_block = SubscriptionBlock({}) + subscriber = AccountId(block.get_message().source) + subscription = AccountId(block.get_message().destination) + amount = Amount(block.get_message().value) + failed = False + subscription_payment = find_call_contract(other_blocks, SubscriptionPayment.opcode) + beneficiary = AccountId(subscription_payment.get_message().destination) + + payment_request = find_call_contract(other_blocks, SubscriptionPaymentRequest.opcode) + if payment_request is not None: + payment_request_data = SubscriptionPaymentRequest(payment_request.get_body()) + amount = Amount(payment_request_data.grams) + failed = payment_request.failed + new_block.data = { + 'subscriber': subscriber, + 'subscription': subscription, + 'beneficiary': beneficiary, + 'amount': amount + } + new_block.failed = failed + new_block.merge_blocks([block] + other_blocks) + return [new_block] + + +class UnsubscribeBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(child_matcher=ContractMatcher(opcode=WalletPluginDestruct.opcode, optional=True)) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == WalletPluginDestruct.opcode + + async def build_block(self, block: Block | CallContractBlock, other_blocks: list[Block]) -> list[Block]: + new_block = UnsubscribeBlock({}) + data = { + 'subscriber': AccountId(block.get_message().source), + 'subscription': AccountId(block.get_message().destination), + 'beneficiary': None + } + response = find_call_contract(other_blocks, WalletPluginDestruct.opcode) + if response is not None: + data['beneficiary'] = AccountId(response.get_message().destination) + new_block.data = data + new_block.merge_blocks([block] + other_blocks) + return [new_block] diff --git a/indexer/indexer/events/blocks/swaps.py b/indexer/indexer/events/blocks/swaps.py new file mode 100644 index 0000000..c0a073b --- /dev/null +++ b/indexer/indexer/events/blocks/swaps.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +from indexer.events import context +from indexer.events.blocks.basic_blocks import CallContractBlock +from indexer.events.blocks.basic_matchers import BlockMatcher, child_sequence_matcher, ContractMatcher, \ + BlockTypeMatcher, OrMatcher +from indexer.events.blocks.core import Block, SingleLevelWrapper +from indexer.events.blocks.jettons import JettonTransferBlock +from indexer.events.blocks.messages import DedustPayout, DedustPayoutFromPool, DedustSwapPeer, DedustSwapExternal, \ + DedustSwap +from indexer.events.blocks.messages import StonfiSwapMessage, StonfiPaymentRequest, DedustSwapNotification +from indexer.events.blocks.utils import AccountId, Asset, Amount +from indexer.events.blocks.utils.block_utils import find_call_contracts, find_messages + + +class JettonSwapBlock(Block): + def __init__(self, data): + super().__init__('jetton_swap', [], data) + + def __repr__(self): + return f"jetton_swap {self.data}" + + +async def _get_block_data(other_blocks): + swap_call_block = next(x for x in other_blocks if isinstance(x, CallContractBlock) and + x.opcode == StonfiSwapMessage.opcode) + swap_message = StonfiSwapMessage(swap_call_block.get_body()) + payment_requests_messages = [(StonfiPaymentRequest(x.get_body()), x) for x in + find_call_contracts(other_blocks, StonfiPaymentRequest.opcode)] + assert len(payment_requests_messages) > 0 + if len(payment_requests_messages) > 2: + print("Multiple payment requests found ", swap_call_block.event_nodes[0].message.trace_id) + + out_amt = None + out_addr = None + ref_amt = None + ref_addr = None + outgoing_jetton_transfer = None + in_jetton_transfer = swap_call_block.previous_block + + # Find payment request and outgoing jetton transfer + for payment_request, payment_request_block in payment_requests_messages: + if payment_request.amount0_out > 0: + amount = payment_request.amount0_out + addr = payment_request.token0_out + else: + amount = payment_request.amount1_out + addr = payment_request.token1_out + if payment_request.owner == swap_message.from_user_address: + if out_amt is None: + outgoing_jetton_transfer = next(b for b in payment_request_block.next_blocks + if isinstance(b, JettonTransferBlock)) + out_amt = amount + out_addr = addr + elif out_amt < amount: + outgoing_jetton_transfer = next(b for b in payment_request_block.next_blocks + if isinstance(b, JettonTransferBlock)) + ref_amt = out_amt + ref_addr = out_addr + out_amt = amount + out_addr = addr + else: + ref_amt = amount + ref_addr = addr + + out_wallet = await context.interface_repository.get().get_jetton_wallet(out_addr.to_str(False).upper()) + dex_in_wallet = await context.interface_repository.get().get_jetton_wallet( + swap_message.token_wallet.to_str(False).upper()) + out_jetton = AccountId(out_wallet.jetton) if out_wallet is not None else None + in_jetton = AccountId(dex_in_wallet.jetton) if dex_in_wallet is not None else None + + in_source_jetton_wallet = None + if in_jetton_transfer.data['has_internal_transfer']: + in_source_jetton_wallet = in_jetton_transfer.data['sender_wallet'] + + out_destination_jetton_wallet = None + if outgoing_jetton_transfer.data['has_internal_transfer']: + out_destination_jetton_wallet = outgoing_jetton_transfer.data['receiver_wallet'] + + incoming_transfer = { + 'asset': Asset(is_ton=in_jetton is None, jetton_address=in_jetton), + 'amount': Amount(swap_message.amount), + 'source': AccountId(swap_message.from_user_address), + 'source_jetton_wallet': in_source_jetton_wallet, + 'destination': AccountId(dex_in_wallet.owner), + 'destination_jetton_wallet': AccountId(swap_message.token_wallet), + } + + outgoing_transfer = { + 'asset': Asset(is_ton=out_jetton is None, jetton_address=out_jetton), + 'amount': Amount(out_amt), + 'source': outgoing_jetton_transfer.data['sender'], + 'source_jetton_wallet': outgoing_jetton_transfer.data['sender_wallet'] + } + if out_destination_jetton_wallet is not None: + outgoing_transfer['destination_jetton_wallet'] = out_destination_jetton_wallet + outgoing_transfer['destination'] = outgoing_jetton_transfer.data['receiver'] + elif in_jetton_transfer.data['stonfi_swap_body'] is not None: + outgoing_transfer['destination'] = AccountId(in_jetton_transfer.data['stonfi_swap_body']['user_address']) + outgoing_transfer['destination_jetton_wallet'] = None + else: + outgoing_transfer['destination'] = AccountId(swap_message.from_user_address) + outgoing_transfer['destination_jetton_wallet'] = None + + return { + 'dex': 'stonfi', + 'sender': AccountId(swap_message.from_user_address), + 'dex_incoming_transfer': incoming_transfer, + 'dex_outgoing_transfer': outgoing_transfer, + 'referral_amount': Amount(ref_amt), + 'referral_address': ref_addr, + 'peer_swaps': [], + } + + +class StonfiSwapBlockMatcher(BlockMatcher): + def __init__(self): + super().__init__(parent_matcher=None, optional=False, + child_matcher=child_sequence_matcher([ + ContractMatcher(opcode=StonfiSwapMessage.opcode), + ContractMatcher(opcode=StonfiPaymentRequest.opcode), + BlockTypeMatcher(block_type='jetton_transfer') + ])) + + def test_self(self, block: Block): + return isinstance(block, JettonTransferBlock) + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + data = await _get_block_data(other_blocks) + new_block = JettonSwapBlock(data) + include = [block] + include.extend(other_blocks) + new_block.merge_blocks(include) + return [new_block] + + +class DedustPeerBlockMatcher(BlockMatcher): + + def __init__(self): + super().__init__(parent_matcher=None, optional=False, child_matcher=None, + children_matchers=[ContractMatcher(opcode=DedustSwapNotification.opcode), + ContractMatcher(opcode=DedustPayoutFromPool.opcode, + child_matcher=OrMatcher([ + ContractMatcher(opcode=DedustPayout.opcode), + BlockTypeMatcher(block_type='jetton_transfer') + ]), + optional=True)]) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == DedustSwapPeer.opcode + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + wrapper = SingleLevelWrapper() + wrapper.wrap([block] + other_blocks) + return [wrapper] + + +class DedustSwapBlockMatcher(BlockMatcher): + def __init__(self): + ton_swap_parent_matchers = ContractMatcher(opcode=DedustSwapExternal.opcode, + parent_matcher=ContractMatcher(opcode=DedustSwap.opcode, + optional=True)) + + super().__init__(optional=False, child_matcher=None, + parent_matcher=ContractMatcher(opcode=DedustSwapExternal.opcode, + child_matcher=OrMatcher([ + DedustPeerBlockMatcher(), + child_sequence_matcher([ + ContractMatcher(opcode=DedustPayoutFromPool.opcode), + OrMatcher([ + BlockTypeMatcher(block_type='jetton_transfer'), + ContractMatcher(opcode=DedustPayout.opcode) + ])])]), + parent_matcher=OrMatcher([BlockTypeMatcher( + block_type='jetton_transfer'), + ContractMatcher(opcode=DedustSwap.opcode)]))) + + def test_self(self, block: Block): + return isinstance(block, CallContractBlock) and block.opcode == DedustSwapNotification.opcode + + async def build_block(self, block: Block, other_blocks: list[Block]) -> list[Block]: + new_block = JettonSwapBlock({}) + include = [block] + for b in other_blocks: + if isinstance(b, SingleLevelWrapper): + include.extend(b.children_blocks) + else: + include.append(b) + include.extend(other_blocks) + new_block.merge_blocks(include) + + messages = find_messages(new_block.children_blocks, DedustSwapNotification) + messages.sort(key=lambda x: x[0].min_lt) + peer_swaps = [] + for _, message in messages: + data = { + 'in': { + 'amount': Amount(message.amount_in), + 'asset': Asset(is_ton=message.asset_in.is_ton, + jetton_address=message.asset_in.jetton_address), + }, + 'out': { + 'amount': Amount(message.amount_out), + 'asset': Asset(is_ton=message.asset_out.is_ton, + jetton_address=message.asset_out.jetton_address), + } + } + peer_swaps.append(data) + sender = AccountId(messages[0][1].sender_address) + sender_jetton_transfer_blocks = [x for x in new_block.children_blocks if isinstance(x, JettonTransferBlock) + and x.min_lt <= block.min_lt and x.data['sender'] == sender] + sender_wallet = None + dex_incoming_jetton_wallet = None + dex_incoming_wallet = None + if len(sender_jetton_transfer_blocks) > 0: + dex_incoming_jetton_wallet = sender_jetton_transfer_blocks[0].data['receiver_wallet'] + dex_incoming_wallet = sender_jetton_transfer_blocks[0].data['receiver'] + sender_wallet = sender_jetton_transfer_blocks[0].data['sender_wallet'] + else: + swap_requests = find_call_contracts(other_blocks, DedustSwap.opcode) + if len(swap_requests) > 0: + dex_incoming_wallet = AccountId(swap_requests[0].get_message().destination) + + + receiver_jetton_transfer_blocks = [x for x in new_block.children_blocks if isinstance(x, JettonTransferBlock) + and x.min_lt >= block.min_lt and x.data['receiver'] == sender] + receiver = sender + receiver_wallet = None + dex_outgoing_jetton_wallet = None + dex_outgoing_wallet = None + if len(receiver_jetton_transfer_blocks) > 0: + receiver_wallet = receiver_jetton_transfer_blocks[0].data['receiver_wallet'] + dex_outgoing_wallet = receiver_jetton_transfer_blocks[0].data['sender'] + dex_outgoing_jetton_wallet = receiver_jetton_transfer_blocks[0].data['sender_wallet'] + else: + payouts = find_call_contracts(other_blocks, DedustPayout.opcode) + if len(payouts) > 0: + dex_outgoing_wallet = AccountId(payouts[0].get_message().source) + receiver = AccountId(payouts[0].get_message().destination) + + new_block.data = { + 'dex': 'dedust', + 'sender': sender, + 'dex_incoming_transfer': { + 'asset': peer_swaps[0]['in']['asset'], + 'amount': peer_swaps[0]['in']['amount'], + 'source': sender, + 'source_jetton_wallet': sender_wallet, + 'destination': dex_incoming_wallet, + 'destination_jetton_wallet': dex_incoming_jetton_wallet, + }, + 'dex_outgoing_transfer': { + 'asset': peer_swaps[-1]['out']['asset'], + 'amount': peer_swaps[-1]['out']['amount'], + 'source': dex_outgoing_wallet, + 'source_jetton_wallet': dex_outgoing_jetton_wallet, + 'destination': receiver, + 'destination_jetton_wallet': receiver_wallet, + }, + 'in': peer_swaps[0]['in'], + 'out': peer_swaps[-1]['out'], + 'peer_swaps': peer_swaps if len(peer_swaps) > 1 else [] + } + return [new_block] diff --git a/indexer/indexer/events/blocks/utils/__init__.py b/indexer/indexer/events/blocks/utils/__init__.py new file mode 100644 index 0000000..b4ae091 --- /dev/null +++ b/indexer/indexer/events/blocks/utils/__init__.py @@ -0,0 +1,2 @@ +from .ton_utils import * +from .tree_utils import * \ No newline at end of file diff --git a/indexer/indexer/events/blocks/utils/block_tree_serializer.py b/indexer/indexer/events/blocks/utils/block_tree_serializer.py new file mode 100644 index 0000000..b6d52e8 --- /dev/null +++ b/indexer/indexer/events/blocks/utils/block_tree_serializer.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import base64 +import hashlib +import random +from typing import Tuple, List + +from indexer.events.blocks.utils import AccountId, Asset +from indexer.core.database import Action +from indexer.events.blocks.basic_blocks import CallContractBlock, TonTransferBlock +from indexer.events.blocks.core import Block +from indexer.events.blocks.dns import ChangeDnsRecordBlock, DeleteDnsRecordBlock +from indexer.events.blocks.jettons import JettonTransferBlock, JettonBurnBlock +from indexer.events.blocks.nft import NftTransferBlock, NftMintBlock +from indexer.events.blocks.subscriptions import SubscriptionBlock, UnsubscribeBlock +from indexer.events.blocks.swaps import JettonSwapBlock + + +def _addr(addr: AccountId | Asset | None) -> str | None: + if addr is None: + return None + if isinstance(addr, Asset): + return addr.jetton_address.as_str() if addr.jetton_address is not None else None + else: + return addr.as_str() + + +def _calc_action_id(block: Block) -> str: + root_event_node = min(block.event_nodes, key=lambda n:n.get_lt()) + key = "" + if root_event_node.message is not None: + key = root_event_node.message.msg_hash + else: + key = root_event_node.get_tx_hash() + key += block.btype + h = hashlib.sha256(key.encode()) + return base64.b64encode(h.digest()).decode() + + +def _base_block_to_action(block: Block, trace_id: str) -> Action: + action_id = _calc_action_id(block) + tx_hashes = list(set(n.get_tx_hash() for n in block.event_nodes)) + return Action( + trace_id=trace_id, + type=block.btype, + action_id=action_id, + tx_hashes=tx_hashes, + start_lt=block.min_lt, + end_lt=block.max_lt, + start_utime=block.min_utime, + end_utime=block.max_utime, + success=not block.failed) + + +def _fill_call_contract_action(block: CallContractBlock, action: Action): + action.opcode = block.opcode + action.value = block.data['value'].value + action.source = block.data['source'].as_str() if block.data['source'] is not None else None + action.destination = block.data['destination'].as_str() if block.data['destination'] is not None else None + + +def _fill_ton_transfer_action(block: TonTransferBlock, action: Action): + action.value = block.value + action.source = block.data['source'].as_str() + if block.data['destination'] is None: + print("Something very wrong", block.event_nodes[0].message.trace_id) + action.destination = block.data['destination'].as_str() + content = block.data['comment'].replace("\u0000", "") if block.data['comment'] is not None else None + action.ton_transfer_data = {'content': content, 'encrypted': block.data['encrypted']} + + +def _fill_jetton_transfer_action(block: JettonTransferBlock, action: Action): + action.source = block.data['sender'].as_str() + action.source_secondary = block.data['sender_wallet'].as_str() + action.destination = block.data['receiver'].as_str() + action.destination_secondary = block.data['receiver_wallet'].as_str() if 'receiver_wallet' in block.data else None + action.amount = block.data['amount'].value + asset = block.data['asset'] + if asset is None or asset.is_ton: + action.asset = None + else: + action.asset = asset.jetton_address.as_str() + comment = None + if block.data['comment'] is not None: + if block.data['encrypted_comment']: + comment = base64.b64encode(block.data['comment']).decode('utf-8') + else: + comment = block.data['comment'].decode('utf-8').replace("\u0000", "") + action.jetton_transfer_data = { + 'query_id': block.data['query_id'], + 'response_destination': block.data['response_address'].as_str() if block.data[ + 'response_address'] is not None else None, + 'forward_amount': block.data['forward_amount'].value, + 'custom_payload': block.data['custom_payload'], + 'forward_payload': block.data['forward_payload'], + 'comment': comment, + 'is_encrypted_comment': block.data['encrypted_comment'] + } + + +def _fill_nft_transfer_action(block: NftTransferBlock, action: Action): + if 'prev_owner' in block.data and block.data['prev_owner'] is not None: + action.source = block.data['prev_owner'].as_str() + action.destination = block.data['new_owner'].as_str() + action.asset_secondary = block.data['nft']['address'].as_str() + if block.data['nft']['collection'] is not None: + action.asset = block.data['nft']['collection']['address'].as_str() + action.nft_transfer_data = { + 'query_id': block.data['query_id'], + 'is_purchase': block.data['is_purchase'], + 'price': block.data['price'].value if 'price' in block.data and block.data['is_purchase'] else None, + 'nft_item_index': block.data['nft']['index'], + 'forward_amount': block.data['forward_amount'].value if block.data['forward_amount'] is not None else None, + 'custom_payload': block.data['custom_payload'], + 'forward_payload': block.data['forward_payload'], + 'response_destination': block.data['response_destination'].as_str() if block.data['response_destination'] else None, + } + + +def _fill_nft_mint_action(block: NftMintBlock, action: Action): + if block.data["source"]: + action.source = block.data["source"].as_str() + action.destination = block.data["address"].as_str() + action.asset_secondary = action.destination + if block.data["collection"]: + action.asset = block.data["collection"].as_str() + action.nft_mint_data = { + 'nft_item_index': block.data["index"], + } + + +def _convert_peer_swap(peer_swap: dict) -> dict: + in_obj = peer_swap['in'] + out_obj = peer_swap['out'] + return { + 'amount_in': in_obj['amount'].value, + 'asset_in': in_obj['asset'].jetton_address.as_str() if in_obj['asset'].jetton_address is not None else None, + 'amount_out': out_obj['amount'].value, + 'asset_out': out_obj['asset'].jetton_address.as_str() if out_obj['asset'].jetton_address is not None else None, + } + + +def _fill_jetton_swap_action(block: JettonSwapBlock, action: Action): + dex_incoming_transfer = { + 'amount': block.data['dex_incoming_transfer']['amount'].value, + 'source': _addr(block.data['dex_incoming_transfer']['source']), + 'source_jetton_wallet': _addr(block.data['dex_incoming_transfer']['source_jetton_wallet']), + 'destination': _addr(block.data['dex_incoming_transfer']['destination']), + 'destination_jetton_wallet': _addr(block.data['dex_incoming_transfer']['destination_jetton_wallet']), + 'asset': _addr(block.data['dex_incoming_transfer']['asset']) + } + dex_outgoing_transfer = { + 'amount': block.data['dex_outgoing_transfer']['amount'].value, + 'source': _addr(block.data['dex_outgoing_transfer']['source']), + 'source_jetton_wallet': _addr(block.data['dex_outgoing_transfer']['source_jetton_wallet']), + 'destination': _addr(block.data['dex_outgoing_transfer']['destination']), + 'destination_jetton_wallet': _addr(block.data['dex_outgoing_transfer']['destination_jetton_wallet']), + 'asset': _addr(block.data['dex_outgoing_transfer']['asset']) + } + action.asset = dex_incoming_transfer['asset'] + action.asset2 = dex_outgoing_transfer['asset'] + action.source = dex_incoming_transfer['source'] + action.source_secondary = dex_incoming_transfer['source_jetton_wallet'] + action.destination = dex_outgoing_transfer['destination'] + action.destination_secondary = dex_outgoing_transfer['destination_jetton_wallet'] + action.jetton_swap_data = { + 'dex': block.data['dex'], + 'sender': _addr(block.data['sender']), + 'dex_incoming_transfer': dex_incoming_transfer, + 'dex_outgoing_transfer': dex_outgoing_transfer, + } + + +def _fill_jetton_burn_action(block: JettonBurnBlock, action: Action): + action.source = block.data['owner'].as_str() + action.source_secondary = block.data['jetton_wallet'].as_str() + action.asset = block.data['asset'].jetton_address.as_str() + action.amount = block.data['amount'].value + + +def _fill_change_dns_record_action(block: ChangeDnsRecordBlock, action: Action): + action.source = block.data['source'].as_str() if block.data['source'] is not None else None + action.destination = block.data['destination'].as_str() + dns_record_data = block.data['value'] + data = { + 'value_schema': dns_record_data['schema'], + 'flags': None, + 'address': None, + 'key': block.data['key'].hex(), + } + if data['value_schema'] in ('DNSNextResolver', 'DNSSmcAddress'): + data['address'] = dns_record_data['address'].as_str() + elif data['value_schema'] == 'DNSAdnlAddress': + data['address'] = dns_record_data['address'].hex() + data['flags'] = dns_record_data['flags'] + if data['value_schema'] == 'DNSSmcAddress': + data['flags'] = dns_record_data['flags'] + if data['value_schema'] == 'DNSText': + data['dns_text'] = dns_record_data['dns_text'] + action.change_dns_record_data = data + + +def _fill_delete_dns_record_action(block: DeleteDnsRecordBlock, action: Action): + action.source = block.data['source'].as_str() if block.data['source'] is not None else None + action.destination = block.data['destination'].as_str() + data = { + 'value_schema': None, + 'flags': None, + 'address': None, + 'key': block.data['key'].hex(), + } + action.change_dns_record_data = data + + +def _fill_subscribe_action(block: SubscriptionBlock, action: Action): + action.source = block.data['subscriber'].as_str() + action.destination = block.data['beneficiary'].as_str() if block.data['beneficiary'] is not None else None + action.destination_secondary = block.data['subscription'].as_str() + action.amount = block.data['amount'].value + + +def _fill_unsubscribe_action(block: UnsubscribeBlock, action: Action): + action.source = block.data['subscriber'].as_str() + action.destination = block.data['beneficiary'].as_str() if block.data['beneficiary'] is not None else None + action.destination_secondary = block.data['subscription'].as_str() + + +def _fill_election_action(block: Block, action: Action): + action.source = block.data['stake_holder'].as_str() + action.amount = block.data['amount'].value if 'amount' in block.data else None + + +def _fill_auction_bid_action(block: Block, action: Action): + action.source = block.data['bidder'].as_str() + action.destination = block.data['auction'].as_str() + action.asset_secondary = block.data['nft_address'].as_str() + action.value = block.data['amount'].value + + +# noinspection PyCompatibility,PyTypeChecker +def block_to_action(block: Block, trace_id: str) -> Action: + action = _base_block_to_action(block, trace_id) + match block.btype: + case 'call_contract': + _fill_call_contract_action(block, action) + case 'ton_transfer': + _fill_ton_transfer_action(block, action) + case 'jetton_transfer': + _fill_jetton_transfer_action(block, action) + case 'nft_transfer': + _fill_nft_transfer_action(block, action) + case 'nft_mint': + _fill_nft_mint_action(block, action) + case 'jetton_burn': + _fill_jetton_burn_action(block, action) + case 'jetton_swap': + _fill_jetton_swap_action(block, action) + case 'change_dns': + _fill_change_dns_record_action(block, action) + case 'delete_dns': + _fill_delete_dns_record_action(block, action) + case 'subscribe': + _fill_subscribe_action(block, action) + case 'unsubscribe': + _fill_unsubscribe_action(block, action) + case 'election_deposit' | 'election_recover': + _fill_election_action(block, action) + case 'auction_bid': + _fill_auction_bid_action(block, action) + + return action diff --git a/indexer/indexer/events/blocks/utils/block_utils.py b/indexer/indexer/events/blocks/utils/block_utils.py new file mode 100644 index 0000000..ffc8e7e --- /dev/null +++ b/indexer/indexer/events/blocks/utils/block_utils.py @@ -0,0 +1,33 @@ +from __future__ import annotations +from typing import TypeVar, Type + +from indexer.events.blocks.basic_blocks import CallContractBlock, TonTransferBlock +from indexer.events.blocks.core import Block, AccountFlow, AccountValueFlow +from indexer.events.blocks.utils import EventNode + +T = TypeVar('T') + + +def find_call_contracts(blocks: list[Block], opcode: int | set) -> list[CallContractBlock]: + if isinstance(opcode, int): + return [b for b in blocks if isinstance(b, CallContractBlock) and b.opcode == opcode] + else: + return [b for b in blocks if isinstance(b, CallContractBlock) and b.opcode in opcode] + + +def find_call_contract(blocks: list[Block], opcode: int) -> CallContractBlock | None: + for b in blocks: + if isinstance(b, CallContractBlock) and b.opcode == opcode: + return b + return None + + +def find_messages(blocks: list[Block], message_class: Type[T]) -> list[tuple[Block, T]]: + return [(b, message_class(b.get_body())) for b in find_call_contracts(blocks, message_class.opcode)] + + +def merge_flows(blocks: list[Block]) -> AccountValueFlow: + flow = AccountValueFlow() + for block in blocks: + flow.merge(block.value_flow) + return flow diff --git a/indexer/indexer/events/blocks/utils/event_deserializer.py b/indexer/indexer/events/blocks/utils/event_deserializer.py new file mode 100644 index 0000000..0f798c6 --- /dev/null +++ b/indexer/indexer/events/blocks/utils/event_deserializer.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import base64 + +import msgpack + +from indexer.core.database import MessageContent, Transaction, Message, Trace, TraceEdge + +account_status_map = ['uninit', 'frozen', 'active', 'nonexist'] + + +def _message_from_tuple(tx: Transaction, data, direction: str) -> Message: + (msg_hash, source, destination, value, fwd_fee, ihr_fee, created_lt, created_at, opcode, ihr_disabled, bounce, + bounced, + import_fee, body_boc, init_state_boc) = data + message_content = MessageContent(hash='', body=body_boc) + message = Message( + msg_hash=msg_hash, + tx_hash=tx.hash, + tx_lt=tx.lt, + source=source, + destination=destination, + direction=direction, + value=value, + fwd_fee=fwd_fee, + ihr_fee=ihr_fee, + created_lt=created_lt, + created_at=created_at, + opcode=opcode, + ihr_disabled=ihr_disabled, + bounce=bounce, + bounced=bounced, + import_fee=import_fee, + message_content=message_content, + transaction=tx, + ) + if init_state_boc is not None: + message.init_state = MessageContent(hash='', body=init_state_boc) + return message + + +def _tx_description_from_tuple(data): + (credit_first, storage_ph_tuple, credit_ph_tuple, compute_ph_tuple, action, aborted, bounce, destroyed) = data + storage_ph = { + 'storage_fees_collected': storage_ph_tuple[0], + 'storage_fees_due': storage_ph_tuple[1], + 'status_change': storage_ph_tuple[2] + } + credit_ph = { + 'due_fees_collected': credit_ph_tuple[0], + 'credit': credit_ph_tuple[1], + } + compute_ph_type = compute_ph_tuple[0] + compute_ph = None + if compute_ph_type == 0: + compute_ph = { + 'type': 'skipped', + 'reason': compute_ph_tuple[1][0] + } + else: + compute_ph = { + 'type': 'vm', + 'success': compute_ph_tuple[1][0], + 'msg_state_used': compute_ph_tuple[1][1], + 'account_activated': compute_ph_tuple[1][2], + 'gas_fees': compute_ph_tuple[1][3], + 'gas_used': compute_ph_tuple[1][4], + 'gas_limit': compute_ph_tuple[1][5], + 'gas_credit': compute_ph_tuple[1][6], + 'mode': compute_ph_tuple[1][7], + 'exit_code': compute_ph_tuple[1][8], + 'exit_arg': compute_ph_tuple[1][9], + 'vm_steps': compute_ph_tuple[1][10], + 'vm_init_state_hash': compute_ph_tuple[1][11], + 'vm_final_state_hash': compute_ph_tuple[1][12], + } + return { + 'credit_first': credit_first, + 'storage_ph': storage_ph, + 'credit_ph': credit_ph, + 'compute_ph': compute_ph, + 'aborted': aborted, + 'bounce': bounce, + 'destroyed': destroyed, + } + +def fill_tx_description(tx: Transaction, data): + (credit_first, storage_ph_tuple, credit_ph_tuple, compute_ph_tuple, action, aborted, bounce, destroyed) = data + tx.descr = 'ord' + tx.credit_first = credit_first + tx.aborted = aborted + tx.bounce = bounce + tx.destroyed = destroyed + tx.storage_fees_collected = storage_ph_tuple[0] + tx.storage_fees_due = storage_ph_tuple[1] + match storage_ph_tuple[2]: + case 0: + tx.storage_fees_change = 'unchanged' + case 1: + tx.storage_fees_change = 'frozen' + case 2: + tx.storage_fees_change = 'deleted' + tx.due_fees_collected = credit_ph_tuple[0] + tx.credit = credit_ph_tuple[1] + match compute_ph_tuple[0]: + case 0: + tx.compute_skipped = True + tx.skipped_reason = compute_ph_tuple[1][0] + case 1: + tx.compute_mode = 'vm' + tx.compute_success = compute_ph_tuple[1][0] + tx.compute_msg_state_used = compute_ph_tuple[1][1] + tx.compute_account_activated = compute_ph_tuple[1][2] + tx.compute_gas_fees = compute_ph_tuple[1][3] + tx.compute_gas_used = compute_ph_tuple[1][4] + tx.compute_gas_limit = compute_ph_tuple[1][5] + tx.compute_gas_credit = compute_ph_tuple[1][6] + tx.compute_mode = compute_ph_tuple[1][7] + tx.compute_exit_code = compute_ph_tuple[1][8] + tx.compute_exit_arg = compute_ph_tuple[1][9] + tx.compute_vm_steps = compute_ph_tuple[1][10] + tx.compute_vm_init_state_hash = compute_ph_tuple[1][11] + tx.compute_vm_final_state_hash = compute_ph_tuple[1][12] + if action is not None: + tx.action_success = action[0] + tx.action_valid = action[1] + tx.action_no_funds = action[2] + tx.action_status_change = action[3] + tx.action_total_fwd_fees = action[4] + tx.action_total_action_fees = action[5] + tx.action_result_code = action[6] + tx.action_result_arg = action[7] + tx.action_tot_actions = action[8] + tx.action_spec_actions = action[9] + tx.action_skipped_actions = action[10] + tx.action_msgs_created = action[11] + tx.action_action_list_hash = action[12] + tx.action_tot_msg_size_cells = action[13][0] + tx.action_tot_msg_size_bits = action[13][1] + +def unpack_messagepack_tx(data: bytes) -> Transaction: + (tx_data, emulated) = msgpack.unpackb(data, raw=False) + (tx_hash, account, lt, prev_trans_hash, prev_trans_lt, now, orig_status, end_status, in_msg, out_msgs, total_fees, + account_state_hash_before, account_state_hash_after, description) = tx_data + tx = Transaction( + lt=lt, + hash=tx_hash, + prev_trans_hash=prev_trans_hash, + prev_trans_lt=prev_trans_lt, + account=account, + now=now, + orig_status=account_status_map[orig_status], + end_status=account_status_map[end_status], + total_fees=total_fees, + account_state_hash_before=account_state_hash_before, + account_state_hash_after=account_state_hash_after, + emulated=emulated + ) + fill_tx_description(tx, description) + tx.messages = [_message_from_tuple(tx, msg, 'out') for msg in out_msgs] + [ + _message_from_tuple(tx, in_msg, 'in')] + return tx + + +def deserialize_event(trace_id, packed_transactions_map: dict[str, bytes]) -> Trace: + edges = [] + transactions = [] + root = packed_transactions_map[trace_id] + + def load_leaf(tx): + for msg in tx.messages: + if msg.direction != 'out': + continue + child_tx = unpack_messagepack_tx(packed_transactions_map[msg.msg_hash]) + edges.append(TraceEdge(left_tx=tx.hash, right_tx=child_tx.hash, msg_hash=msg.msg_hash, trace_id=trace_id)) + transactions.append(child_tx) + load_leaf(child_tx) + + root_tx = unpack_messagepack_tx(root) + transactions.append(root_tx) + load_leaf(root_tx) + return Trace(transactions=transactions, edges=edges, trace_id=trace_id, classification_state='unclassified', state='complete') diff --git a/indexer/indexer/events/blocks/utils/ton_utils.py b/indexer/indexer/events/blocks/utils/ton_utils.py new file mode 100644 index 0000000..7a3d366 --- /dev/null +++ b/indexer/indexer/events/blocks/utils/ton_utils.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from pytoniq_core import Address + +from indexer.core.database import Transaction + + +class Asset: + is_ton: bool + is_jetton: bool + jetton_address: AccountId | None + + def __init__(self, is_ton: bool, jetton_address: Address | AccountId | str | None = None): + self.is_ton = is_ton + self.is_jetton = jetton_address is not None + if isinstance(jetton_address, str): + self.jetton_address = AccountId(jetton_address) + elif isinstance(jetton_address, Address): + self.jetton_address = AccountId(jetton_address) + else: + self.jetton_address = jetton_address + + def to_json(self): + if self.is_ton: + return "TON" + else: + return self.jetton_address.to_json() + + def __repr__(self): + return self.to_json() + + +def is_failed(tx: Transaction): + description = tx.description + if "compute_ph" in description: + compute_type = description["compute_ph"]["type"] + if compute_type == "skipped": + return False + elif compute_type == "vm": + return description["compute_ph"]["exit_code"] != 0 + + +class AccountId: + def __init__(self, address: str | Address): + if isinstance(address, str): + if address == 'addr_none': + self.address = None + else: + self.address = Address(address) + else: + self.address = address + + def __repr__(self): + return self.address.to_str(False) + + def __eq__(self, other): + return self.address == other.address + + def __hash__(self): + return hash(self.as_bytes()) + + def as_bytes(self): + if self.address is None: + return None + return self.address.wc.to_bytes(1, byteorder="big", signed=True) + self.address.hash_part + + def as_str(self): + if self.address is None: + return None + return self.address.to_str(False).upper() + + def to_json(self): + return self.as_str() + + +class Amount: + value: int + + def __init__(self, value: int): + self.value = value + + def __repr__(self): + return str(self.value) + + def to_json(self): + return self.value diff --git a/indexer/indexer/events/blocks/utils/tree_utils.py b/indexer/indexer/events/blocks/utils/tree_utils.py new file mode 100644 index 0000000..cb4d6f4 --- /dev/null +++ b/indexer/indexer/events/blocks/utils/tree_utils.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import base64 +from collections import defaultdict +from collections.abc import Iterable +from typing import Callable + +from indexer.core.database import Transaction, Message, Trace, TraceEdge + + +class NoMessageBodyException(Exception): + def __init__(self, message): + super().__init__(message) + + +class EventNode: + def __init__(self, message: Message, children: list['EventNode'], is_tick_tock: bool = False, + tick_tock_tx: Transaction = None): + self.message = message + self.is_tick_tock = is_tick_tock + self.tick_tock_tx = tick_tock_tx + self.parent = None + self.children = children + self.handled = False + if not is_tick_tock: + self.emulated = message.transaction.emulated + self.failed = message.transaction.aborted + else: + self.message = Message(msg_hash=tick_tock_tx.hash, direction='in', transaction=tick_tock_tx) + self.emulated = tick_tock_tx.emulated + self.failed = tick_tock_tx.aborted + if not is_tick_tock and message.message_content is None: + raise NoMessageBodyException( + "Message content is None for " + message.msg_hash + " - tx: " + message.tx_hash) + + def get_type(self): + if self.message.destination is None: + return 'notification' + elif self.message.source is None: + return 'external' + else: + return 'internal' + + def get_opcode(self): + if self.message.opcode is not None: + return self.message.opcode & 0xFFFFFFFF + else: + return None + # return self.message.message.opcode & 0xFFFFFFFF + + def set_parent(self, parent: 'EventNode'): + self.parent = parent + + def add_child(self, child: 'EventNode'): + self.children.append(child) + child.set_parent(self) + + def get_tx_hash(self): + if self.is_tick_tock and self.tick_tock_tx is not None: + return self.tick_tock_tx.hash + elif self.message is not None: + return self.message.tx_hash + else: + return None + + + def get_lt(self): + if self.is_tick_tock and self.tick_tock_tx is not None: + return self.tick_tock_tx.lt + else: + return self.message.created_lt + + +def to_tree(txs: list[Transaction]): + txs = sorted(txs, key=lambda tx: tx.lt, reverse=True) + msg_tx = {} + nodes = {} + def create_node(tx: Transaction) -> EventNode: + """Helper function to create an EventNode from a transaction hash.""" + message = next((m for m in tx.messages if m.direction == "in"), None) + + if message is None and tx.descr == "tick_tock": + return EventNode(None, [], is_tick_tock=True, tick_tock_tx=tx) + msg_tx[message.msg_hash] = tx.hash + return EventNode(message, []) + + for tx in txs: + if tx.hash not in nodes: + nodes[tx.hash] = create_node(tx) + for m in tx.messages: + if m.direction == "out": + if m.destination is None: + nodes[tx.hash].add_child(EventNode(m, [])) + else: + assert m.msg_hash in msg_tx + nodes[tx.hash].add_child(nodes[msg_tx[m.msg_hash]]) + root = nodes[txs[-1].hash] + while root.parent is not None: + root = root.parent + return root + + + +def not_handled_nodes() -> Callable[[EventNode, int], bool]: + return lambda node, depth: not node.handled + + +def with_opcode(opcodes: set[int]) -> Callable[[EventNode, int], bool]: + return lambda node, depth: node.get_opcode() in opcodes diff --git a/indexer/indexer/events/context.py b/indexer/indexer/events/context.py new file mode 100644 index 0000000..9ad5447 --- /dev/null +++ b/indexer/indexer/events/context.py @@ -0,0 +1,11 @@ +from contextvars import ContextVar + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import Session + +from indexer.events.interface_repository import InterfaceRepository + +# noinspection PyTypeChecker +session: ContextVar[AsyncSession] = ContextVar('db_session', default=None) +# noinspection PyTypeChecker +interface_repository: ContextVar[InterfaceRepository] = ContextVar('interface_repository', default=None) \ No newline at end of file diff --git a/indexer/indexer/events/event_processing.py b/indexer/indexer/events/event_processing.py new file mode 100644 index 0000000..70dd6ea --- /dev/null +++ b/indexer/indexer/events/event_processing.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import base64 +import logging + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import sessionmaker + +from indexer.events.blocks.utils import AccountId +from indexer.core.database import Trace, engine, TraceEdge +from indexer.events.blocks.basic_blocks import TonTransferBlock, CallContractBlock +from indexer.events.blocks.core import Block +from indexer.events.blocks.auction import AuctionBidMatcher +from indexer.events.blocks.dns import ChangeDnsRecordMatcher +from indexer.events.blocks.elections import ElectionDepositStakeBlockMatcher, ElectionRecoverStakeBlockMatcher +from indexer.events.blocks.jettons import JettonTransferBlockMatcher, JettonBurnBlockMatcher +from indexer.events.blocks.messages import TonTransferMessage +from indexer.events.blocks.nft import NftTransferBlockMatcher, TelegramNftPurchaseBlockMatcher, NftMintBlockMatcher +from indexer.events.blocks.subscriptions import SubscriptionBlockMatcher, UnsubscribeBlockMatcher +from indexer.events.blocks.swaps import DedustSwapBlockMatcher, StonfiSwapBlockMatcher +from indexer.events.blocks.utils import NoMessageBodyException +from indexer.events.blocks.utils import to_tree, EventNode + +async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) +logger = logging.getLogger(__name__) + + +def init_block(node: EventNode) -> Block: + block = None + is_ton_transfer = (node.get_opcode() == 0 or node.get_opcode() is None or + node.get_opcode() == TonTransferMessage.encrypted_opcode) + if node.is_tick_tock: + block = Block('tick_tock', [node], {'account': AccountId(node.tick_tock_tx.account)}) + elif is_ton_transfer and node.message.destination is not None and node.message.source is not None: + block = TonTransferBlock(node) + else: + block = CallContractBlock(node) + for child in node.children: + block.connect(init_block(child)) + return block + + +matchers = [ + NftMintBlockMatcher(), + JettonTransferBlockMatcher(), + JettonBurnBlockMatcher(), + DedustSwapBlockMatcher(), + StonfiSwapBlockMatcher(), + NftTransferBlockMatcher(), + TelegramNftPurchaseBlockMatcher(), + ChangeDnsRecordMatcher(), + ElectionDepositStakeBlockMatcher(), + ElectionRecoverStakeBlockMatcher(), + SubscriptionBlockMatcher(), + UnsubscribeBlockMatcher(), + AuctionBidMatcher() +] + + +async def process_event_async(trace: Trace) -> Block: + try: + node = to_tree(trace.transactions) + root = Block('root', []) + root.connect(init_block(node)) + + for m in matchers: + for b in root.bfs_iter(): + if b.parent is None: + await m.try_build(b) + return root + except NoMessageBodyException as e: + raise e + except Exception as e: + logging.error(f"Failed to process {trace.trace_id}") + raise e diff --git a/indexer/indexer/events/interface_repository.py b/indexer/indexer/events/interface_repository.py new file mode 100644 index 0000000..03a462e --- /dev/null +++ b/indexer/indexer/events/interface_repository.py @@ -0,0 +1,344 @@ +from __future__ import annotations + +import abc +import asyncio +from collections import defaultdict +from contextvars import ContextVar + +import msgpack +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from indexer.core.database import JettonWallet, NFTItem, NftSale, NftAuction +import redis + + +class InterfaceRepository(abc.ABC): + @abc.abstractmethod + async def get_jetton_wallet(self, address: str) -> JettonWallet | None: + pass + + @abc.abstractmethod + async def get_nft_item(self, address: str) -> NFTItem | None: + pass + + @abc.abstractmethod + async def get_nft_sale(self, address: str) -> NftSale | None: + pass + + @abc.abstractmethod + async def get_nft_auction(self, address: str) -> NftAuction | None: + pass + + @abc.abstractmethod + async def get_interfaces(self, addresses: set[str]) -> dict[str, dict]: + pass + + +class InMemoryInterfaceRepository(InterfaceRepository): + def __init__(self, interface_map: dict[str, dict[str, dict]], backoff_repository: InterfaceRepository): + self.interface_map = interface_map + self.backoff_repository = backoff_repository + + async def get_jetton_wallet(self, address: str) -> JettonWallet | None: + if address in self.interface_map: + interfaces = self.interface_map[address] + for (interface_type, interface_data) in interfaces.items(): + if interface_type == "JettonWallet": + return JettonWallet( + balance=interface_data["balance"], + address=interface_data["address"], + owner=interface_data["owner"], + jetton=interface_data["jetton"], + ) + elif self.backoff_repository is not None: + return await self.backoff_repository.get_jetton_wallet(address) + return None + + async def get_nft_item(self, address: str) -> NFTItem | None: + if address in self.interface_map: + interfaces = self.interface_map[address] + for (interface_type, interface_data) in interfaces.items(): + if interface_type == "NftItem": + return NFTItem( + address=interface_data["address"], + init=interface_data["init"], + index=interface_data["index"], + collection_address=interface_data["collection_address"], + owner_address=interface_data["owner_address"], + content=interface_data["content"], + ) + elif self.backoff_repository is not None: + return await self.backoff_repository.get_nft_item(address) + return None + + async def get_nft_sale(self, address: str) -> NftSale | None: + if address in self.interface_map: + interfaces = self.interface_map[address] + for (interface_type, interface_data) in interfaces.items(): + if interface_type == "NftSale": + return NftSale( + address=interface_data["address"], + is_complete=interface_data["is_complete"], + marketplace_address=interface_data["marketplace_address"], + nft_address=interface_data["nft_address"], + nft_owner_address=interface_data["nft_owner_address"], + full_price=interface_data["full_price"], + ) + return None + + async def get_nft_auction(self, address: str) -> NftAuction | None: + return None + + +class SqlAlchemyInterfaceRepository(InterfaceRepository): + def __init__(self, session: ContextVar[AsyncSession]): + self.session = session + + async def get_jetton_wallet(self, address: str) -> JettonWallet | None: + return await self.session.get().get(JettonWallet, address) + + async def get_nft_item(self, address: str) -> NFTItem | None: + return await self.session.get().get(NFTItem, address) + + async def get_nft_sale(self, address: str) -> NftSale | None: + return None + + async def get_nft_auction(self, address: str) -> NftAuction | None: + return await self.session.get().get(NftAuction, address) + + +class RedisInterfaceRepository(InterfaceRepository): + prefix = "I_" # Prefix for keys in Redis + + def __init__(self, connection: redis.Redis): + self.connection = connection + + async def put_interfaces(self, interfaces: dict[str, dict[str, dict]]): + batch_size = 5000 + serialized_interfaces = [(RedisInterfaceRepository.prefix + address, msgpack.packb(data, use_bin_type=True)) + for (address, data) in interfaces.items() if len(data.keys()) > 0] + for i in range(0, len(serialized_interfaces), batch_size): + pipe = self.connection.pipeline() + for (key, value) in serialized_interfaces[i:i + batch_size]: + pipe.set(key, value, ex=300) + pipe.execute() + + async def get_jetton_wallet(self, address: str) -> JettonWallet | None: + raw_data = self.connection.get(RedisInterfaceRepository.prefix + address) + if raw_data is None: + return None + + interfaces = msgpack.unpackb(raw_data, raw=False) + interface_data = next((data for (interface_type, data) in interfaces.items() if interface_type == "JettonWallet"), None) + if interface_data is not None: + return JettonWallet( + balance=interface_data["balance"], + address=interface_data["address"], + owner=interface_data["owner"], + jetton=interface_data["jetton"], + ) + return None + + async def get_nft_item(self, address: str) -> NFTItem | None: + raw_data = self.connection.get(RedisInterfaceRepository.prefix + address) + if raw_data is None: + return None + + interfaces = msgpack.unpackb(raw_data, raw=False) + interface_data = next((data for (interface_type, data) in interfaces.items() if interface_type == "NftItem"), None) + if interface_data is not None: + return NFTItem( + address=interface_data["address"], + init=interface_data["init"], + index=interface_data["index"], + collection_address=interface_data["collection_address"], + owner_address=interface_data["owner_address"], + content=interface_data["content"], + ) + return None + + async def get_nft_sale(self, address: str) -> NftSale | None: + raw_data = self.connection.get(RedisInterfaceRepository.prefix + address) + if raw_data is None: + return None + + interfaces = msgpack.unpackb(raw_data, raw=False) + interface_data = next((data for (interface_type, data) in interfaces.items() if interface_type == "NftSale"), None) + if interface_data is not None: + return NftSale( + address=interface_data["address"], + is_complete=interface_data["is_complete"], + marketplace_address=interface_data["marketplace_address"], + nft_address=interface_data["nft_address"], + nft_owner_address=interface_data["nft_owner_address"], + full_price=interface_data["full_price"], + ) + return None + + async def get_interfaces(self, address: str) -> dict[str, dict]: + result = {} + raw_data = self.connection.get(RedisInterfaceRepository.prefix + address) + if raw_data is None: + return {} + + interfaces = msgpack.unpackb(raw_data, raw=False) + return interfaces + + async def get_nft_auction(self, address: str) -> NftAuction | None: + raw_data = self.connection.get(RedisInterfaceRepository.prefix + address) + if raw_data is None: + return None + + interfaces = msgpack.unpackb(raw_data, raw=False) + interface_data = next((data for (interface_type, data) in interfaces.items() if interface_type == "NftAuction"), + None) + if interface_data is not None: + return NftAuction( + address=interface_data["address"], + nft_addr=interface_data["nft_addr"], + nft_owner=interface_data["nft_owner"], + ) + return None + + +class EmulatedTransactionsInterfaceRepository(InterfaceRepository): + + def __init__(self, redis_hash: dict[str, bytes]): + self.data = redis_hash + + async def get_jetton_wallet(self, address: str) -> JettonWallet | None: + raw_data = self.data.get(address) + if raw_data is None: + return None + + data = msgpack.unpackb(raw_data, raw=False) + interfaces = data[0] + for (interface_type, interface_data) in interfaces: + if interface_type == 0: + return JettonWallet( + balance=interface_data[0], + address=interface_data[1], + owner=interface_data[2], + jetton=interface_data[3], + ) + return None + + async def get_nft_item(self, address: str) -> NFTItem | None: + raw_data = self.data.get(address) + if raw_data is None: + return None + + data = msgpack.unpackb(raw_data, raw=False) + interfaces = data[0] + for (interface_type, interface_data) in interfaces: + if interface_type == 2: + return NFTItem( + address=interface_data[0], + init=interface_data[1], + index=interface_data[2], + collection_address=interface_data[3], + owner_address=interface_data[4], + content=interface_data[5], + ) + return None + + async def get_nft_sale(self, address: str) -> NftSale | None: + raw_data = self.data.get(address) + if raw_data is None: + return None + + data = msgpack.unpackb(raw_data, raw=False) + interfaces = data[0] + for (interface_type, interface_data) in interfaces: + if interface_type == 4: + return NftSale( + address=interface_data[0], + is_complete=interface_data[1], + marketplace_address=interface_data[3], + nft_address=interface_data[4], + nft_owner_address=interface_data[5], + full_price=interface_data[6], + ) + return None + + async def get_nft_auction(self, address: str) -> NftAuction | None: + raw_data = self.data.get(address) + if raw_data is None: + return None + + data = msgpack.unpackb(raw_data, raw=False) + interfaces = data[0] + for (interface_type, interface_data) in interfaces: + if interface_type == 5: + return NftAuction( + address=interface_data[0], + nft_addr=interface_data[4], + nft_owner=interface_data[5], + ) + return None + + async def get_interfaces(self, address: str) -> dict[str, dict]: + return {} + + +async def _gather_data_from_db( + accounts: set[str], + session: AsyncSession +) -> tuple[list[JettonWallet], list[NFTItem], list[NftSale], list[NftAuction]]: + jetton_wallets = [] + nft_items = [] + nft_sales = [] + getgems_auctions = [] + account_list = list(accounts) + for i in range(0, len(account_list), 5000): + batch = account_list[i:i + 5000] + wallets = await session.execute(select(JettonWallet).filter(JettonWallet.address.in_(batch))) + nft = await session.execute(select(NFTItem).filter(NFTItem.address.in_(batch))) + sales = await session.execute(select(NftSale).filter(NftSale.address.in_(batch))) + auctions = await session.execute(select(NftAuction).filter(NftAuction.address.in_(batch))) + jetton_wallets += list(wallets.scalars().all()) + nft_items += list(nft.scalars().all()) + nft_sales += list(sales.scalars().all()) + getgems_auctions += list(auctions.scalars().all()) + + return jetton_wallets, nft_items, nft_sales, getgems_auctions + + +async def gather_interfaces(accounts: set[str], session: AsyncSession) -> dict[str, dict[str, dict]]: + result = defaultdict(dict) + (jetton_wallets, nft_items, nft_sales, nft_auctions) = await _gather_data_from_db(accounts, session) + for wallet in accounts: + result[wallet] = {} + for wallet in jetton_wallets: + result[wallet.address]["JettonWallet"] = { + "balance": float(wallet.balance), + "address": wallet.address, + "owner": wallet.owner, + "jetton": wallet.jetton, + } + for item in nft_items: + result[item.address]["NftItem"] = { + "address": item.address, + "init": item.init, + "index": float(item.index), + "collection_address": item.collection_address, + "owner_address": item.owner_address, + "content": item.content, + } + for sale in nft_sales: + result[sale.address]["NftSale"] = { + "address": sale.address, + "is_complete": sale.is_complete, + "marketplace_address": sale.marketplace_address, + "nft_address": sale.nft_address, + "nft_owner_address": sale.nft_owner_address, + "full_price": float(sale.full_price), + } + for auction in nft_auctions: + result[auction.address]["NftAuction"] = { + "address": auction.address, + "nft_addr": auction.nft_addr, + "nft_owner": auction.nft_owner + } + return result diff --git a/indexer/requirements.txt b/indexer/requirements.txt index baec317..8d8a6d8 100644 --- a/indexer/requirements.txt +++ b/indexer/requirements.txt @@ -14,4 +14,12 @@ SQLAlchemy-Utils==0.41.1 requests==2.31.0 pytonlib==0.0.54 uvloop==0.19.0 -httpx==0.25.2 \ No newline at end of file +httpx==0.25.2 +redis<4.4 +hiredis +tqdm +pytoniq-core +pymongo +msgpack +contextvars +argparse diff --git a/run_event_classifier.sh b/run_event_classifier.sh new file mode 100755 index 0000000..55c4762 --- /dev/null +++ b/run_event_classifier.sh @@ -0,0 +1,24 @@ +#!/bin/bash +set -e + +if [[ ! $# -eq "3" ]]; then + echo "Exactly 3 arguments required: container name, PostgreSQL and Redis connection string." + exit 1 +fi + +CONTAINER_NAME=${1} +PG_DSN=${2} +REDIS_DSN=${3} + +docker build -t ton-index-event-classifier:devel -f indexer/classifier.Dockerfile indexer + +docker rm --force ${CONTAINER_NAME} +docker run -e TON_INDEXER_PG_DSN="$PG_DSN" \ + -e TON_INDEXER_REDIS_DSN="$REDIS_DSN" \ + -e TQDM_NCOLS=0 \ + -e TQDM_POSITION=-1 \ + --restart unless-stopped \ + --name ${CONTAINER_NAME} \ + --network host \ + --entrypoint /usr/local/bin/python \ + -it ton-index-event-classifier:devel -- /app/event_classifier.py diff --git a/swarm-deploy.sh b/swarm-deploy.sh index 568265b..476584a 100755 --- a/swarm-deploy.sh +++ b/swarm-deploy.sh @@ -5,7 +5,8 @@ set -e DEPLOY_DATABASE=0 MIGRATE=0 BUILD=1 -DEPLOY_API=1 +DEPLOY_API=0 +DEPLOY_EVENTS=0 BUILD_ARGS= POSITIONAL_ARGS=() @@ -16,7 +17,8 @@ function usage() { echo ' -m --migrate Run database migration' echo ' --no-build Do not build docker images' echo ' --no-build-cache No build cache' - echo ' --no-api Do not deploy API' + echo ' --deploy-api Deploy API' + echo ' --deploy-events Deploy event classfier' echo ' -h --help Show this message' exit } @@ -28,7 +30,7 @@ while [[ $# -gt 0 ]]; do usage exit 1 ;; - -d|--deploy-db) + -d|--db) DEPLOY_DATABASE=1 shift ;; @@ -44,8 +46,12 @@ while [[ $# -gt 0 ]]; do BUILD_ARGS=--no-cache shift ;; - --no-api) - DEPLOY_API=0 + --api) + DEPLOY_API=1 + shift + ;; + --events) + DEPLOY_EVENTS=1 shift ;; -*|--*) @@ -99,8 +105,8 @@ fi # build image if [[ $BUILD -eq "1" ]]; then - docker compose -f docker-compose.api.yaml build $BUILD_ARGS - docker compose -f docker-compose.api.yaml push + docker compose -f docker-compose.api.yaml -f docker-compose.events.yaml build $BUILD_ARGS + docker compose -f docker-compose.api.yaml -f docker-compose.events.yaml push fi if [[ $MIGRATE -eq "1" ]]; then @@ -112,3 +118,8 @@ if [[ $DEPLOY_API -eq "1" ]]; then echo "Deploying API" docker stack deploy --with-registry-auth -c docker-compose.api.yaml ${STACK_NAME} fi + +if [[ $DEPLOY_EVENTS -eq "1" ]]; then + echo "Deploying event classifier" + docker stack deploy --with-registry-auth -c docker-compose.events.yaml ${STACK_NAME} +fi diff --git a/ton-index-cpp b/ton-index-cpp index 87a94d6..393f717 160000 --- a/ton-index-cpp +++ b/ton-index-cpp @@ -1 +1 @@ -Subproject commit 87a94d6fd04678ceba6ba6aa85ce37f56aff4f8b +Subproject commit 393f717e5b0e776d1279e208373e043ad60f5778 diff --git a/ton-index-go b/ton-index-go new file mode 160000 index 0000000..2b0a8eb --- /dev/null +++ b/ton-index-go @@ -0,0 +1 @@ +Subproject commit 2b0a8ebe10d16e11a4964f389f098d292a0837ba