From 299ed0a19aa4d8d489ee49b233f21e19133ebc5a Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Tue, 30 Apr 2024 15:06:39 -0600 Subject: [PATCH] add _partition col and manual fix logic to transfers models --- .../curated/nft/silver__nft_transfers.sql | 19 +++-- .../curated/silver__token_transfers.sql | 69 +++++++++++++------ 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/models/silver/curated/nft/silver__nft_transfers.sql b/models/silver/curated/nft/silver__nft_transfers.sql index 90a946b6..dba941a2 100644 --- a/models/silver/curated/nft/silver__nft_transfers.sql +++ b/models/silver/curated/nft/silver__nft_transfers.sql @@ -18,12 +18,16 @@ WITH actions_events AS ( receiver_id, logs, _inserted_timestamp, - modified_timestamp as _modified_timestamp + modified_timestamp as _modified_timestamp, + _partition_by_block_number FROM {{ ref('silver__actions_events_function_call_s3') }} WHERE receipt_succeeded = TRUE AND logs [0] IS NOT NULL + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} {% if is_incremental() %} AND modified_timestamp >= ( SELECT @@ -32,6 +36,7 @@ WITH actions_events AS ( {{ this }} ) {% endif %} + {% endif %} ), -------------------------------- NFT Transfers -------------------------------- @@ -46,7 +51,8 @@ nft_logs AS ( receiver_id AS contract_id, b.index as logs_rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM actions_events JOIN LATERAL FLATTEN( @@ -78,7 +84,8 @@ nft_transfers AS ( token_ids [0] :: STRING AS token_id, logs_rn + A.index as transfer_rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM nft_logs JOIN LATERAL FLATTEN( @@ -100,7 +107,8 @@ nft_final AS ( B.value :: STRING AS token_id, transfer_rn + B.index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM nft_transfers JOIN LATERAL FLATTEN( @@ -119,7 +127,8 @@ FINAL AS ( to_address, token_id, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM nft_final ) diff --git a/models/silver/curated/silver__token_transfers.sql b/models/silver/curated/silver__token_transfers.sql index c07e3142..d562c34b 100644 --- a/models/silver/curated/silver__token_transfers.sql +++ b/models/silver/curated/silver__token_transfers.sql @@ -24,12 +24,16 @@ WITH actions_events AS ( logs, receipt_succeeded, _inserted_timestamp, - modified_timestamp as _modified_timestamp + modified_timestamp as _modified_timestamp, + _partition_by_block_number FROM {{ ref('silver__actions_events_function_call_s3') }} WHERE receipt_succeeded = TRUE AND logs [0] IS NOT NULL + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} {% if is_incremental() %} AND _modified_timestamp >= ( SELECT @@ -38,6 +42,7 @@ WITH actions_events AS ( {{ this }} ) {% endif %} + {% endif %} ), swaps_raw AS ( SELECT @@ -53,18 +58,23 @@ swaps_raw AS ( amount_in_raw, amount_out_raw, _inserted_timestamp, - modified_timestamp AS _modified_timestamp + modified_timestamp AS _modified_timestamp, + _partition_by_block_number FROM {{ ref('silver__dex_swaps_v2') }} - {% if is_incremental() %} - WHERE - _modified_timestamp >= ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} - ) - {% endif %} + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} ), ---------------------------- Native Token Transfers ------------------------------ native_transfers AS ( @@ -80,11 +90,15 @@ native_transfers AS ( --numeric validation (there are some exceptions that needs to be ignored) receipt_succeeded, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM {{ ref('silver__transfers_s3') }} WHERE status = TRUE AND deposit != 0 + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} {% if is_incremental() %} AND inserted_timestamp >= ( SELECT @@ -93,6 +107,7 @@ native_transfers AS ( {{ this }} ) {% endif %} + {% endif %} ), ------------------------------ NEAR Tokens (NEP 141) -------------------------------- swaps AS ( @@ -108,7 +123,8 @@ swaps AS ( 'swap' AS memo, swap_index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM swaps_raw UNION ALL @@ -124,7 +140,8 @@ swaps AS ( 'swap' AS memo, swap_index + 1 as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM swaps_raw ), @@ -139,7 +156,8 @@ orders AS ( DATA :event :: STRING AS event, g.index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM actions_events JOIN LATERAL FLATTEN( @@ -163,7 +181,8 @@ orders_final AS ( 'order' AS memo, f.index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM orders JOIN LATERAL FLATTEN( @@ -199,7 +218,8 @@ add_liquidity AS ( 'add_liquidity' AS memo, index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM actions_events, LATERAL FLATTEN ( @@ -224,7 +244,8 @@ ft_transfers_mints AS ( b.index as logs_rn, receiver_id AS contract_address, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM actions_events JOIN LATERAL FLATTEN( @@ -255,7 +276,8 @@ ft_transfers_mints_final AS ( f.value :memo :: STRING AS memo, logs_rn + f.index as rn, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM ft_transfers_mints JOIN LATERAL FLATTEN( @@ -302,7 +324,8 @@ native_final AS ( amount_unadjusted :: STRING AS amount_raw, amount_unadjusted :: FLOAT AS amount_raw_precise, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM native_transfers ), @@ -322,7 +345,8 @@ nep_final AS ( amount_unadjusted :: STRING AS amount_raw, amount_unadjusted :: FLOAT AS amount_raw_precise, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM nep_transfers ), @@ -354,7 +378,8 @@ FINAL AS ( amount_raw_precise, transfer_type, _inserted_timestamp, - _modified_timestamp + _modified_timestamp, + _partition_by_block_number FROM transfer_union