Skip to content

Commit

Permalink
Merge pull request #24 from Zir0h/rewrite-metadata_utils
Browse files Browse the repository at this point in the history
Rewrite metadata utils
  • Loading branch information
Zir0h authored Aug 19, 2022
2 parents e370d3e + 3e9039f commit 1487a31
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 91 deletions.
6 changes: 0 additions & 6 deletions dipdup.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,5 @@ hasura:
allow_aggregations: false
camel_case: false

sentry:
dsn: ${SENTRY_DSN:-"https://ef33481a853b44e39187bdf2d9eef773@newsentry.baking-bad.org/6"}
environment: ${SENTRY_ENVIRONMENT:-""}
server_name: ${SENTRY_SERVER_NAME:-""}
release: ${SENTRY_RELEASE:-"feat-db-rollback"}

prometheus:
host: 0.0.0.0
32 changes: 31 additions & 1 deletion dipdup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package: hicdex

advanced:
early_realtime: True
merge_subscriptions: True
postpone_jobs: True

database:
kind: sqlite
Expand Down Expand Up @@ -60,6 +62,32 @@ datasources:
kind: metadata
url: https://api-metadata.teia.rocks
network: mainnet
ipfs:
kind: ipfs
url: https://ipfs.teia.rocks/ipfs
http:
retry_count: 1
retry_sleep: 1
batch_size: 100
fallback_ipfs:
kind: ipfs
url: https://nftstorage.link/ipfs
http:
retry_count: 2
retry_sleep: 1
batch_size: 100
ratelimit_rate: 200
ratelimit_period: 60
fallback2_ipfs:
kind: ipfs
url: https://ipfs.io/ipfs
http:
retry_count: 2
retry_sleep: 1
batch_size: 100
bcd:
kind: http
url: https://api.better-call.dev/v1

indexes:
hen_mainnet:
Expand Down Expand Up @@ -234,4 +262,6 @@ hooks:
jobs:
fix_missing_metadata:
hook: fix_missing_metadata
interval: 600
interval: 300

logging: quiet
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ services:
volumes:
- ./dipdup.yml:/home/dipdup/dipdup.yml
- ./dipdup.prod.yml:/home/dipdup/dipdup.prod.yml
- ./data/hicdex-metadata:/home/dipdup/metadata
command: ["-c", "dipdup.yml", "-c", "dipdup.prod.yml", "run"]
restart: unless-stopped
environment:
Expand Down Expand Up @@ -36,7 +35,7 @@ services:
retries: 5

hasura:
image: hasura/graphql-engine:v2.8.3
image: hasura/graphql-engine:v2.8.4
ports:
- 8081:8080
expose:
Expand Down
18 changes: 11 additions & 7 deletions src/hicdex/handlers/on_subjkt_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
from typing import Dict

from dipdup.context import HandlerContext
from dipdup.enums import MessageType
from dipdup.models import Transaction

import hicdex.models as models
from hicdex.metadata_utils import get_subjkt_metadata
from hicdex.metadata_utils import fetch_metadata_ipfs
from hicdex.types.hen_subjkt.parameter.registry import RegistryParameter
from hicdex.types.hen_subjkt.storage import HenSubjktStorage
from hicdex.utils import fromhex
Expand All @@ -18,22 +19,25 @@ async def on_subjkt_register(
registry: Transaction[RegistryParameter, HenSubjktStorage],
) -> None:
addr = registry.data.sender_address
_logger.info(f'{addr}')
holder, _ = await models.Holder.get_or_create(address=addr)

name = fromhex(registry.parameter.subjkt)
_logger.info(f'{name}')
metadata_file = fromhex(registry.parameter.metadata)
_logger.info(f'{metadata_file}')
metadata: Dict[str, str] = {}

holder.name = name
holder.metadata_file = metadata_file
holder.metadata = metadata

try:
if metadata_file.startswith('ipfs://'):
_logger.info("Fetching IPFS metadata")
holder.metadata = await get_subjkt_metadata(holder)
except Exception as exc:
ctx.logger.error('Failed to fetch metadata for %s: %s', holder.address, exc)
level = ctx.get_tzkt_datasource("tzkt_mainnet").get_channel_level(MessageType.head)
if registry.data.level > level - 200 and metadata_file.startswith('ipfs://'):
_logger.info("Fetching IPFS metadata")
holder.metadata = await fetch_metadata_ipfs(ctx, holder.metadata_file)
else:
_logger.info("Skipping metadata fetch")

holder.description = holder.metadata.get('description', '')

Expand Down
3 changes: 2 additions & 1 deletion src/hicdex/hooks/fix_missing_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from dipdup.context import HookContext

from hicdex.metadata_utils import fix_other_metadata
from hicdex.metadata_utils import fix_holder_metadata, fix_other_metadata


async def fix_missing_metadata(
ctx: HookContext,
) -> None:
await fix_holder_metadata(ctx)
await fix_other_metadata(ctx)
140 changes: 66 additions & 74 deletions src/hicdex/metadata_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@
import hicdex.models as models
from hicdex.utils import clean_null_bytes, http_request

METADATA_PATH = '/home/dipdup/metadata/tokens'
SUBJKT_PATH = '/home/dipdup/metadata/subjkts'
IPFS_API = os.environ.get('IPFS_API', 'https://cloudflare-ipfs.com/ipfs/')

_logger = logging.getLogger(__name__)

broken_ids = []
try:
with open(f'{METADATA_PATH}/broken.json') as broken_list:
broken_ids = json.load(broken_list)
except Exception as exc:
_logger.error(f'Unable to load {METADATA_PATH}/broken.json: %s', exc)


async def fix_token_metadata(ctx: DipDupContext, token: models.Token) -> bool:
metadata = await get_metadata(ctx, token)
if isinstance(metadata, str):
metadata = json.loads(metadata)

if isinstance(metadata, bytes):
return False

token.title = get_name(metadata)
token.description = get_description(metadata)
token.artifact_uri = get_artifact_uri(metadata)
Expand All @@ -47,17 +42,39 @@ async def fix_token_metadata(ctx: DipDupContext, token: models.Token) -> bool:
return metadata != {}


async def fix_subjkt_metadata(ctx: DipDupContext, holder: models.Holder) -> bool:
metadata = await fetch_metadata_ipfs(ctx, holder.metadata_file)
if isinstance(metadata, str):
metadata = json.loads(metadata)

if isinstance(metadata, bytes):
_logger.warning(f'invalid metadata: {metadata}')
return False

holder.metadata = metadata
holder.description = metadata.get('description', {})

await holder.save()
return metadata != {}


async def fix_other_metadata(ctx: DipDupContext) -> None:
_logger.info(f'running fix_missing_metadata job')
async for token in models.Token.filter(
Q(artifact_uri='') | Q(rights__isnull=True) & ~Q(id__in=broken_ids)
).order_by('id'):
async for token in models.Token.filter(Q(artifact_uri='') | Q(rights__isnull=True)).order_by('-id'):
fixed = await fix_token_metadata(ctx, token)
if fixed:
_logger.info(f'fixed metadata for {token.id}')
else:
_logger.info(f'failed to fix metadata for {token.id}')
broken_ids.append(token.id)
_logger.warning(f'failed to fix metadata for {token.id}')


async def fix_holder_metadata(ctx: DipDupContext) -> None:
async for holder in models.Holder.filter(~Q(metadata_file='') & Q(metadata='{}')):
fixed = await fix_subjkt_metadata(ctx, holder)
if fixed:
_logger.info(f'fixed metadata for {holder.address}')
else:
_logger.warning(f'failed to fix metadata for {holder.address}')


async def add_tags(token: models.Token, metadata: Dict[str, Any]) -> None:
Expand All @@ -72,91 +89,73 @@ async def get_or_create_tag(tag: str) -> models.TagModel:
return tag_model


async def get_subjkt_metadata(holder: models.Holder) -> Dict[str, Any]:
failed_attempt = 0
with suppress(Exception), open(subjkt_path(holder.address)) as json_file:
metadata = json.load(json_file)
failed_attempt = metadata.get('__failed_attempt')
if failed_attempt and failed_attempt > 1:
return {}
if not failed_attempt:
return metadata

return await fetch_subjkt_metadata_ipfs(holder, failed_attempt)


async def get_metadata(ctx: DipDupContext, token: models.Token) -> Dict[str, Any]:
# FIXME: hard coded contract
metadata_datasource = ctx.get_metadata_datasource('metadata')
metadata = await metadata_datasource.get_token_metadata('KT1RJ6PbjHpwc3M5rw5s2Nbmefwbuwbdxton', token.id)
if metadata is not None:
_logger.info(f'found metadata for {token.id} from metadata_datasource')
if isinstance(metadata, str):
return json.loads(metadata)
return metadata

data = await fetch_metadata_ipfs(token)
data = await fetch_metadata_ipfs(ctx, token.metadata)
if data != {}:
_logger.info(f'found metadata for {token.id} from IPFS')
else:
data = await fetch_metadata_bcd(token)
data = await fetch_metadata_bcd(ctx, token)
if data != {}:
_logger.info(f'metadata for {token.id} from BCD')

return data


def write_subjkt_metadata_file(holder: models.Holder, metadata: Dict[str, Any]) -> None:
with open(subjkt_path(holder.address), 'w') as write_file:
json.dump(metadata, write_file)


async def fetch_metadata_bcd(token: models.Token) -> Dict[str, Any]:
session = aiohttp.ClientSession()
data = await http_request(
session,
'get',
url=f'https://api.better-call.dev/v1/tokens/mainnet/metadata?contract:KT1Hkg5qeNhfwpKW4fXvq7HGZB9z2EnmCCA9&token_id={token.id}',
async def fetch_metadata_bcd(ctx: DipDupContext, token: models.Token) -> Dict[str, Any]:
api = ctx.get_http_datasource('bcd')
data = await api.request(
method='get',
url=f'tokens/mainnet/metadata?contract:KT1Hkg5qeNhfwpKW4fXvq7HGZB9z2EnmCCA9&token_id={token.id}',
weight=1, # ratelimiter leaky-bucket drops
)
await session.close()

data = [
obj
for obj in data
if 'symbol' in obj and (obj['symbol'] == 'OBJKT' or obj['contract'] == 'KT1RJ6PbjHpwc3M5rw5s2Nbmefwbuwbdxton')
]

with suppress(FileNotFoundError):
if data and not isinstance(data[0], list):
return data[0]
return {}


async def fetch_subjkt_metadata_ipfs(holder: models.Holder, failed_attempt: int = 0) -> Dict[str, Any]:
addr = holder.metadata_file.replace('ipfs://', '')
try:
session = aiohttp.ClientSession()
data = await http_request(session, 'get', url=f'{IPFS_API}/{addr}', timeout=10)
await session.close()
if data and not isinstance(data, list):
write_subjkt_metadata_file(holder, data)
return data
with open(subjkt_path(holder.address), 'w') as write_file:
json.dump({'__failed_attempt': failed_attempt + 1}, write_file)
except Exception:
await session.close()
async def call_ipfs(ctx: DipDupContext, provider: str, path: str) -> Dict[str, Any]:
ipfs_datasource = ctx.get_ipfs_datasource(provider)
data = await ipfs_datasource.get(path.replace('ipfs://', ''))
if data and not isinstance(data, list):
return data
return {}


async def fetch_metadata_ipfs(token: models.Token) -> Dict[str, Any]:
addr = token.metadata.replace('ipfs://', '')
async def fetch_metadata_ipfs(ctx: DipDupContext, path: str) -> Dict[str, Any]:
if not path.startswith('ipfs://'):
return {}

try:
session = aiohttp.ClientSession()
data = await http_request(session, 'get', url=f'{IPFS_API}/{addr}', timeout=10)
await session.close()
if data and not isinstance(data, list):
return data
except Exception:
await session.close()
_logger.info(f'trying main ipfs url')
return await call_ipfs(ctx, 'ipfs', path.replace('ipfs://', ''))
except Exception as e:
_logger.warning(f'error during ipfs call: {e}')
try:
_logger.info(f'trying fallback ipfs url')
return await call_ipfs(ctx, 'fallback_ipfs', path.replace('ipfs://', ''))
except Exception as e:
_logger.warning(f'fallback also borked: {e}')
try:
_logger.warning(f'last one')
return await call_ipfs(ctx, 'fallback2_ipfs', path.replace('ipfs://', ''))
except Exception as e:
_logger.warning(f'giving up')

return {}


Expand Down Expand Up @@ -208,10 +207,3 @@ def get_thumbnail_uri(metadata: Dict[str, Any]) -> str:

def get_right_uri(metadata: Dict[str, Any]) -> str:
return clean_null_bytes(metadata.get('right_uri', '') or metadata.get('rightUri', ''))


def subjkt_path(addr: str) -> str:
lvl = addr[-1]
folder = f'{SUBJKT_PATH}/{lvl}'
Path(folder).mkdir(parents=True, exist_ok=True)
return f'{folder}/{addr}.json'

0 comments on commit 1487a31

Please sign in to comment.