diff --git a/src/index.rs b/src/index.rs index 412a18cc0c..bcee2822c5 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1723,7 +1723,7 @@ impl Index { Utc::now() .round_subsecs(0) .checked_add_signed(chrono::Duration::seconds( - 10 * 60 * i64::try_from(expected_blocks)?, + 10 * 60 * i64::from(expected_blocks), )) .ok_or_else(|| anyhow!("block timestamp out of range"))?, )) diff --git a/src/index/updater.rs b/src/index/updater.rs index 6d205255d0..49e32e6bed 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -516,7 +516,7 @@ impl<'index> Updater<'_> { coinbase_inputs.extend(input_sat_ranges); } - if let Some((tx, txid)) = block.txdata.get(0) { + if let Some((tx, txid)) = block.txdata.first() { self.index_transaction_sats( tx, *txid, diff --git a/src/index/updater/inscription_updater.rs b/src/index/updater/inscription_updater.rs index 71317df267..9b76618956 100644 --- a/src/index/updater/inscription_updater.rs +++ b/src/index/updater/inscription_updater.rs @@ -22,6 +22,7 @@ pub(super) struct Flotsam { origin: Origin, } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] enum Origin { New { diff --git a/src/index/updater/inscription_updater/stream.rs b/src/index/updater/inscription_updater/stream.rs index 1dd0ce2486..2127deab15 100644 --- a/src/index/updater/inscription_updater/stream.rs +++ b/src/index/updater/inscription_updater/stream.rs @@ -1,6 +1,9 @@ use crate::subcommand::traits::Output; use base64::{engine::general_purpose, Engine as _}; use log::{error, warn}; +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::{Duration, Instant}; use super::*; use rdkafka::{ @@ -13,6 +16,11 @@ use std::str::FromStr; lazy_static! { static ref CLIENT: StreamClient = StreamClient::new(); + static ref OLD_OWNER_CACHE: Cache> = + Cache::new(Some(Duration::from_secs(60))); + static ref IS_BRC20: bool = env::var("KAFKA_TOPIC") + .map(|kafka_topic| kafka_topic.to_lowercase().contains("brc20")) + .unwrap_or(false); } struct StreamClient { @@ -290,32 +298,42 @@ impl StreamEvent { pub(crate) fn with_transfer(&mut self, old_satpoint: SatPoint, index: &Index) -> &mut Self { self.old_location = Some(old_satpoint); - self.old_owner = index - .get_transaction(old_satpoint.outpoint.txid) - .unwrap_or(None) - .and_then(|tx| { - tx.output - .get(old_satpoint.outpoint.vout as usize) - .and_then(|txout| { - Address::from_script(&txout.script_pubkey, StreamEvent::get_network()) - .map_err(|e| { - warn!( - "StreamEvent::with_transfer could not parse old_owner address: {}", - e - ); - }) - .ok() - }) - }); - match index - .get_inscription_by_id_unsafe(self.inscription_id) - .unwrap_or(None) - { - Some(inscription) => { - self.enrich_content(inscription); - } - None => { - warn!("could not find inscription for id {}", self.inscription_id); + + let txid = old_satpoint.outpoint.txid; + self.old_owner = OLD_OWNER_CACHE.get(&txid).unwrap_or_else(|| { + let old_owner = index + .get_transaction(old_satpoint.outpoint.txid) + .unwrap_or(None) + .and_then(|tx| { + tx.output + .get(old_satpoint.outpoint.vout as usize) + .and_then(|txout| { + Address::from_script(&txout.script_pubkey, StreamEvent::get_network()) + .map_err(|e| { + warn!( + "StreamEvent::with_transfer could not parse old_owner address: {}", + e + ); + }) + .ok() + }) + }); + OLD_OWNER_CACHE.set(txid, old_owner.clone()); + old_owner + }); + + // Only enrich the inscription if it is a BRC20 transfer + if *IS_BRC20 { + match index + .get_inscription_by_id_unsafe(self.inscription_id) + .unwrap_or(None) + { + Some(inscription) => { + self.enrich_content(inscription); + } + None => { + warn!("could not find inscription for id {}", self.inscription_id); + } } } self @@ -357,6 +375,14 @@ impl StreamEvent { if env::var("KAFKA_TOPIC").is_err() { return Ok(()); } + + // skip brc20 sats mint transfer events + if let Some(brc20) = &self.brc20 { + if brc20.tick == "sats" && brc20.op == "mint" && self.old_owner.is_some() { + return Ok(()); + } + } + let key = self.key(); let payload = serde_json::to_vec(&self)?; let record = BaseRecord::to(&CLIENT.topic).key(&key).payload(&payload); @@ -389,3 +415,53 @@ impl StreamEvent { Ok(()) } } + +pub struct CacheEntry { + value: V, + expires_at: Instant, +} + +pub struct Cache { + map: Mutex>>, + ttl: Duration, +} + +impl Cache +where + K: Eq + std::hash::Hash + Clone, + V: Clone, +{ + fn new(ttl: Option) -> Cache { + let ttl = ttl.unwrap_or(Duration::from_secs(60)); // Default to 1 minute + Cache { + map: Mutex::new(HashMap::new()), + ttl, + } + } + + pub fn set(&self, key: K, value: V) { + let mut map = self.map.lock().unwrap(); + let entry = CacheEntry { + value, + expires_at: Instant::now() + self.ttl, + }; + map.insert(key, entry); + + let now = Instant::now(); + map.retain(|_, v| v.expires_at > now); // Clean up expired entries + } + + pub fn get(&self, key: &K) -> Option { + let now = Instant::now(); + let mut map = self.map.lock().unwrap(); + + if let Some(entry) = map.get(key) { + if entry.expires_at > now { + return Some(entry.value.clone()); + } else { + map.remove(key); + } + } + None + } +} diff --git a/src/lib.rs b/src/lib.rs index 7ee041eace..a89f09ec0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,8 +83,9 @@ use { tokio::{runtime::Runtime, task}, }; -pub use crate::{ +pub use self::{ block_rarity::BlockRarity, + envelope::Envelope, fee_rate::FeeRate, index::Index, inscription::Inscription, diff --git a/src/sat.rs b/src/sat.rs index ec61c99f81..6c15ae1931 100644 --- a/src/sat.rs +++ b/src/sat.rs @@ -631,7 +631,6 @@ mod tests { case(0); case(1); - case(50 * COIN_VALUE - 1); case(50 * COIN_VALUE); case(50 * COIN_VALUE + 1); case(2067187500000000 - 1); diff --git a/src/subcommand/server/rpc.rs b/src/subcommand/server/rpc.rs index 428fcca65f..1ed3bbb789 100644 --- a/src/subcommand/server/rpc.rs +++ b/src/subcommand/server/rpc.rs @@ -57,7 +57,6 @@ async fn get_sat_ranges(value: JsonRpcExtractor, index: Arc) -> JrpcResul block_rarities: Vec, block_height: Height, block_hash: Option, - block_time: i64, } #[derive(Serialize)] @@ -120,7 +119,6 @@ async fn get_sat_ranges(value: JsonRpcExtractor, index: Arc) -> JrpcResul block_rarities, block_height, block_hash: index.block_hash(Some(block_height.n())).unwrap(), - block_time: index.block_time(block_height).unwrap().unix_timestamp(), }); sat_ranges.push(range); } diff --git a/src/templates/blocks.rs b/src/templates/blocks.rs index 52cc492e5a..570b506c72 100644 --- a/src/templates/blocks.rs +++ b/src/templates/blocks.rs @@ -14,7 +14,7 @@ impl BlocksHtml { ) -> Self { Self { last: blocks - .get(0) + .first() .map(|(height, _)| height) .cloned() .unwrap_or(0),