Skip to content

Commit

Permalink
add cache to store old owner for stream event (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
destinymagicchen authored and mi-yu committed Dec 30, 2023
1 parent 9d62bbc commit 1d27e0e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ impl Index {
Utc::now()
.round_subsecs(0)
.checked_add_signed(chrono::Duration::seconds(
10 * 60 * i64::try_from(expected_blocks)?,
10 * 60 * i64::from(expected_blocks),
))
.ok_or_else(|| anyhow!("block timestamp out of range"))?,
))
Expand Down
2 changes: 1 addition & 1 deletion src/index/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl<'index> Updater<'_> {
coinbase_inputs.extend(input_sat_ranges);
}

if let Some((tx, txid)) = block.txdata.get(0) {
if let Some((tx, txid)) = block.txdata.first() {
self.index_transaction_sats(
tx,
*txid,
Expand Down
1 change: 1 addition & 0 deletions src/index/updater/inscription_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(super) struct Flotsam {
origin: Origin,
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
enum Origin {
New {
Expand Down
128 changes: 102 additions & 26 deletions src/index/updater/inscription_updater/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::subcommand::traits::Output;
use base64::{engine::general_purpose, Engine as _};
use log::{error, warn};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use super::*;
use rdkafka::{
Expand All @@ -13,6 +16,11 @@ use std::str::FromStr;

lazy_static! {
static ref CLIENT: StreamClient = StreamClient::new();
static ref OLD_OWNER_CACHE: Cache<Txid, Option<Address>> =
Cache::new(Some(Duration::from_secs(60)));
static ref IS_BRC20: bool = env::var("KAFKA_TOPIC")
.map(|kafka_topic| kafka_topic.to_lowercase().contains("brc20"))
.unwrap_or(false);
}

struct StreamClient {
Expand Down Expand Up @@ -290,32 +298,42 @@ impl StreamEvent {

pub(crate) fn with_transfer(&mut self, old_satpoint: SatPoint, index: &Index) -> &mut Self {
self.old_location = Some(old_satpoint);
self.old_owner = index
.get_transaction(old_satpoint.outpoint.txid)
.unwrap_or(None)
.and_then(|tx| {
tx.output
.get(old_satpoint.outpoint.vout as usize)
.and_then(|txout| {
Address::from_script(&txout.script_pubkey, StreamEvent::get_network())
.map_err(|e| {
warn!(
"StreamEvent::with_transfer could not parse old_owner address: {}",
e
);
})
.ok()
})
});
match index
.get_inscription_by_id_unsafe(self.inscription_id)
.unwrap_or(None)
{
Some(inscription) => {
self.enrich_content(inscription);
}
None => {
warn!("could not find inscription for id {}", self.inscription_id);

let txid = old_satpoint.outpoint.txid;
self.old_owner = OLD_OWNER_CACHE.get(&txid).unwrap_or_else(|| {
let old_owner = index
.get_transaction(old_satpoint.outpoint.txid)
.unwrap_or(None)
.and_then(|tx| {
tx.output
.get(old_satpoint.outpoint.vout as usize)
.and_then(|txout| {
Address::from_script(&txout.script_pubkey, StreamEvent::get_network())
.map_err(|e| {
warn!(
"StreamEvent::with_transfer could not parse old_owner address: {}",
e
);
})
.ok()
})
});
OLD_OWNER_CACHE.set(txid, old_owner.clone());
old_owner
});

// Only enrich the inscription if it is a BRC20 transfer
if *IS_BRC20 {
match index
.get_inscription_by_id_unsafe(self.inscription_id)
.unwrap_or(None)
{
Some(inscription) => {
self.enrich_content(inscription);
}
None => {
warn!("could not find inscription for id {}", self.inscription_id);
}
}
}
self
Expand Down Expand Up @@ -357,6 +375,14 @@ impl StreamEvent {
if env::var("KAFKA_TOPIC").is_err() {
return Ok(());
}

// skip brc20 sats mint transfer events
if let Some(brc20) = &self.brc20 {
if brc20.tick == "sats" && brc20.op == "mint" && self.old_owner.is_some() {
return Ok(());
}
}

let key = self.key();
let payload = serde_json::to_vec(&self)?;
let record = BaseRecord::to(&CLIENT.topic).key(&key).payload(&payload);
Expand Down Expand Up @@ -389,3 +415,53 @@ impl StreamEvent {
Ok(())
}
}

pub struct CacheEntry<V> {
value: V,
expires_at: Instant,
}

pub struct Cache<K, V> {
map: Mutex<HashMap<K, CacheEntry<V>>>,
ttl: Duration,
}

impl<K, V> Cache<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
fn new(ttl: Option<Duration>) -> Cache<K, V> {
let ttl = ttl.unwrap_or(Duration::from_secs(60)); // Default to 1 minute
Cache {
map: Mutex::new(HashMap::new()),
ttl,
}
}

pub fn set(&self, key: K, value: V) {
let mut map = self.map.lock().unwrap();
let entry = CacheEntry {
value,
expires_at: Instant::now() + self.ttl,
};
map.insert(key, entry);

let now = Instant::now();
map.retain(|_, v| v.expires_at > now); // Clean up expired entries
}

pub fn get(&self, key: &K) -> Option<V> {
let now = Instant::now();
let mut map = self.map.lock().unwrap();

if let Some(entry) = map.get(key) {
if entry.expires_at > now {
return Some(entry.value.clone());
} else {
map.remove(key);
}
}
None
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ use {
tokio::{runtime::Runtime, task},
};

pub use crate::{
pub use self::{
block_rarity::BlockRarity,
envelope::Envelope,
fee_rate::FeeRate,
index::Index,
inscription::Inscription,
Expand Down
2 changes: 0 additions & 2 deletions src/sat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,8 @@ mod tests {

case(0);
case(1);
case(50 * COIN_VALUE - 1);
case(50 * COIN_VALUE);
case(50 * COIN_VALUE + 1);
case(2067187500000000 - 1);
case(2067187500000000);
case(2067187500000000 + 1);
}
Expand Down
2 changes: 0 additions & 2 deletions src/subcommand/server/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ async fn get_sat_ranges(value: JsonRpcExtractor, index: Arc<Index>) -> JrpcResul
block_rarities: Vec<BlockRarityInfo>,
block_height: Height,
block_hash: Option<BlockHash>,
block_time: i64,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -120,7 +119,6 @@ async fn get_sat_ranges(value: JsonRpcExtractor, index: Arc<Index>) -> JrpcResul
block_rarities,
block_height,
block_hash: index.block_hash(Some(block_height.n())).unwrap(),
block_time: index.block_time(block_height).unwrap().unix_timestamp(),
});
sat_ranges.push(range);
}
Expand Down
2 changes: 1 addition & 1 deletion src/templates/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl BlocksHtml {
) -> Self {
Self {
last: blocks
.get(0)
.first()
.map(|(height, _)| height)
.cloned()
.unwrap_or(0),
Expand Down
2 changes: 1 addition & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ fn inscription_metadata() {
.rpc_server(&rpc_server)
.run_and_deserialize_output::<Inscribe>()
.inscriptions
.get(0)
.first()
.unwrap()
.id;

Expand Down

0 comments on commit 1d27e0e

Please sign in to comment.