Skip to content

Commit

Permalink
add timer for get_raws from cache
Browse files Browse the repository at this point in the history
  • Loading branch information
KappaShilaff committed Mar 1, 2023
1 parent bef2313 commit 25be4ef
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ impl RawCache {
pub async fn get_raws(
&self,
last_timestamp_block: i32,
timer: Arc<RwLock<i32>>,
) -> (Vec<RawTransaction>, Vec<(i32, i64)>) {
let mut lock = self.0.write().await;

let time = *timer.read().await;
let (res, cache) =
lock.drain(..)
.into_iter()
.fold((vec![], vec![]), |(mut res, mut cache), x| {
if (x.data.now as i32) < last_timestamp_block {
if (x.data.now as i32) < last_timestamp_block || time > 5 {
res.push(x)
} else {
cache.push(x)
Expand All @@ -46,6 +47,7 @@ impl RawCache {
});

*lock = cache;
*timer.write().await = 0;

res.into_iter()
.sorted_by(|x, y| match x.data.now.cmp(&y.data.now) {
Expand Down
19 changes: 15 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ pub fn test_from_raw_transactions(
.unwrap();

let timestamp_last_block = Arc::new(RwLock::new(i32::MAX));

let time = Arc::new(RwLock::new(0));
{
let time = time.clone();
tokio::spawn(timer(time));
}
tokio::spawn(parse_raw_transaction(
parser,
tx_parsed_events,
Expand All @@ -78,6 +82,7 @@ pub fn test_from_raw_transactions(
rx_commit,
RawCache::new(),
timestamp_last_block,
time,
));
BufferedConsumerChannels {
rx_parsed_events,
Expand Down Expand Up @@ -158,7 +163,7 @@ async fn parse_kafka_transactions(
"COMMIT KAFKA {} transactions timestamp_block {} date: {}",
count,
transaction_time,
NaiveDateTime::from_timestamp(transaction_time, 0)
NaiveDateTime::from_timestamp_opt(transaction_time, 0).unwrap()
);
count = 0;
*time.write().await = 0;
Expand All @@ -178,6 +183,7 @@ async fn parse_kafka_transactions(
let parser = parser.clone();
let raw_cache = raw_cache.clone();
let timestamp_last_block = timestamp_last_block.clone();
let timer = time.clone();
tokio::spawn(parse_raw_transaction(
parser,
tx_parsed_events,
Expand All @@ -186,6 +192,7 @@ async fn parse_kafka_transactions(
commit_rx,
raw_cache,
timestamp_last_block,
timer,
));
}

Expand All @@ -201,6 +208,7 @@ async fn parse_kafka_transactions(
let transaction: Transaction = produced_transaction.transaction.clone();
let transaction_timestamp = transaction.now;
*timestamp_last_block.write().await = transaction_timestamp as i32;
*time.write().await = 0;
if buff_extract_events(&transaction, transaction.hash().unwrap(), &parser).is_some() {
insert_raw_transaction(transaction.clone().into(), &config.pg_pool)
.await
Expand All @@ -214,13 +222,14 @@ async fn parse_kafka_transactions(
log::info!(
"KAFKA 5_000 transactions timestamp_block {} date: {}",
transaction_timestamp,
NaiveDateTime::from_timestamp(transaction_timestamp as i64, 0)
NaiveDateTime::from_timestamp_opt(transaction_timestamp as i64, 0).unwrap()
);
i = 0;
}
}
}

#[allow(clippy::too_many_arguments)]
async fn parse_raw_transaction(
parser: TransactionParser,
mut tx: Sender<Vec<(ParsedOutput<AnyExtractableOutput>, RawTransaction)>>,
Expand All @@ -229,6 +238,7 @@ async fn parse_raw_transaction(
mut commit_rx: Receiver<()>,
raw_cache: RawCache,
timestamp_last_block: Arc<RwLock<i32>>,
timer: Arc<RwLock<i32>>
) {
let count_not_processed = get_count_not_processed_raw_transactions(&pg_pool).await;
let mut i: i64 = 0;
Expand Down Expand Up @@ -283,7 +293,8 @@ async fn parse_raw_transaction(

loop {
let (raw_transactions, times) =
raw_cache.get_raws(*timestamp_last_block.read().await).await;
raw_cache.get_raws(*timestamp_last_block.read().await, timer.clone()).await;

if raw_transactions.is_empty() {
sleep(Duration::from_secs(1)).await;
continue;
Expand Down

0 comments on commit 25be4ef

Please sign in to comment.