Skip to content

Commit

Permalink
add cache to store old owner for stream event (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
destinymagicchen authored and mi-yu committed Mar 12, 2024
1 parent 1cecb7b commit 8618357
Showing 1 changed file with 102 additions and 26 deletions.
128 changes: 102 additions & 26 deletions src/index/updater/inscription_updater/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::subcommand::traits::Output;
use base64::{engine::general_purpose, Engine as _};
use log::{error, warn};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use super::*;
use rdkafka::{
Expand All @@ -13,6 +16,11 @@ use std::str::FromStr;

lazy_static! {
static ref CLIENT: StreamClient = StreamClient::new();
static ref OLD_OWNER_CACHE: Cache<Txid, Option<Address>> =
Cache::new(Some(Duration::from_secs(60)));
static ref IS_BRC20: bool = env::var("KAFKA_TOPIC")
.map(|kafka_topic| kafka_topic.to_lowercase().contains("brc20"))
.unwrap_or(false);
}

struct StreamClient {
Expand Down Expand Up @@ -290,32 +298,42 @@ impl StreamEvent {

pub(crate) fn with_transfer(&mut self, old_satpoint: SatPoint, index: &Index) -> &mut Self {
self.old_location = Some(old_satpoint);
self.old_owner = index
.get_transaction(old_satpoint.outpoint.txid)
.unwrap_or(None)
.and_then(|tx| {
tx.output
.get(old_satpoint.outpoint.vout as usize)
.and_then(|txout| {
Address::from_script(&txout.script_pubkey, StreamEvent::get_network())
.map_err(|e| {
warn!(
"StreamEvent::with_transfer could not parse old_owner address: {}",
e
);
})
.ok()
})
});
match index
.get_inscription_by_id_unsafe(self.inscription_id)
.unwrap_or(None)
{
Some(inscription) => {
self.enrich_content(inscription);
}
None => {
warn!("could not find inscription for id {}", self.inscription_id);

let txid = old_satpoint.outpoint.txid;
self.old_owner = OLD_OWNER_CACHE.get(&txid).unwrap_or_else(|| {
let old_owner = index
.get_transaction(old_satpoint.outpoint.txid)
.unwrap_or(None)
.and_then(|tx| {
tx.output
.get(old_satpoint.outpoint.vout as usize)
.and_then(|txout| {
Address::from_script(&txout.script_pubkey, StreamEvent::get_network())
.map_err(|e| {
warn!(
"StreamEvent::with_transfer could not parse old_owner address: {}",
e
);
})
.ok()
})
});
OLD_OWNER_CACHE.set(txid, old_owner.clone());
old_owner
});

// Only enrich the inscription if it is a BRC20 transfer
if *IS_BRC20 {
match index
.get_inscription_by_id_unsafe(self.inscription_id)
.unwrap_or(None)
{
Some(inscription) => {
self.enrich_content(inscription);
}
None => {
warn!("could not find inscription for id {}", self.inscription_id);
}
}
}
self
Expand Down Expand Up @@ -357,6 +375,14 @@ impl StreamEvent {
if env::var("KAFKA_TOPIC").is_err() {
return Ok(());
}

// skip brc20 sats mint transfer events
if let Some(brc20) = &self.brc20 {
if brc20.tick == "sats" && brc20.op == "mint" && self.old_owner.is_some() {
return Ok(());
}
}

let key = self.key();
let payload = serde_json::to_vec(&self)?;
let record = BaseRecord::to(&CLIENT.topic).key(&key).payload(&payload);
Expand Down Expand Up @@ -389,3 +415,53 @@ impl StreamEvent {
Ok(())
}
}

pub struct CacheEntry<V> {
value: V,
expires_at: Instant,
}

pub struct Cache<K, V> {
map: Mutex<HashMap<K, CacheEntry<V>>>,
ttl: Duration,
}

impl<K, V> Cache<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
fn new(ttl: Option<Duration>) -> Cache<K, V> {
let ttl = ttl.unwrap_or(Duration::from_secs(60)); // Default to 1 minute
Cache {
map: Mutex::new(HashMap::new()),
ttl,
}
}

pub fn set(&self, key: K, value: V) {
let mut map = self.map.lock().unwrap();
let entry = CacheEntry {
value,
expires_at: Instant::now() + self.ttl,
};
map.insert(key, entry);

let now = Instant::now();
map.retain(|_, v| v.expires_at > now); // Clean up expired entries
}

pub fn get(&self, key: &K) -> Option<V> {
let now = Instant::now();
let mut map = self.map.lock().unwrap();

if let Some(entry) = map.get(key) {
if entry.expires_at > now {
return Some(entry.value.clone());
} else {
map.remove(key);
}
}
None
}
}

0 comments on commit 8618357

Please sign in to comment.