Skip to content

Commit

Permalink
finish cache
Browse files Browse the repository at this point in the history
  • Loading branch information
KappaShilaff committed Jun 25, 2022
1 parent 0a492b6 commit 3cbe9c7
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 45 deletions.
33 changes: 19 additions & 14 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::models::RawTransactionFromDb;
use crate::sqlx_client::get_all_raw_transactions;
use crate::RawTransaction;
use chrono::Utc;
use itertools::Itertools;
Expand All @@ -7,27 +7,28 @@ use std::cmp::Ordering;
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone)]
pub struct RawCache(Arc<RwLock<Vec<RawTransaction>>>);

pub async fn get_raws() -> Vec<RawTransactionFromDb> {
todo!()
}

impl RawCache {
async fn new(pg_pool: &Pool<Postgres>) -> Self {
let raw_transactions = get_raws()
pub fn new() -> Self {
Self(Arc::new(RwLock::new(vec![])))
}

pub async fn fill_raws(&self, pg_pool: &Pool<Postgres>) {
let raw_transactions = get_all_raw_transactions(pg_pool)
.await
.into_iter()
.map(RawTransaction::from)
.collect_vec();
Self(Arc::new(RwLock::new(raw_transactions)))
.map(|x| x.into_iter().map(RawTransaction::from).collect_vec())
.unwrap_or_default();

*self.0.write().await = raw_transactions;
}

async fn insert_raw(&self, raw: RawTransaction) {
pub async fn insert_raw(&self, raw: RawTransaction) {
self.0.write().await.push(raw);
}

async fn get_raws(&self, delay: i32) -> Vec<RawTransaction> {
pub async fn get_raws(&self, delay: i32) -> (Vec<RawTransaction>, Vec<(i32, i64)>) {
let timestamp_now = Utc::now().timestamp() as i32;
let mut lock = self.0.write().await;

Expand All @@ -51,6 +52,10 @@ impl RawCache {
Ordering::Equal => x.data.lt.cmp(&y.data.lt),
Ordering::Greater => Ordering::Greater,
})
.collect_vec()
.fold((vec![], vec![]), |(mut raws, mut times), x| {
times.push((x.data.now as i32, x.data.lt as i64));
raws.push(x);
(raws, times)
})
}
}
62 changes: 47 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod cache;
pub mod drop_base;
pub mod models;
mod sqlx_client;
mod cache;

use crate::cache::RawCache;
use crate::models::{BufferedConsumerChannels, BufferedConsumerConfig, RawTransaction};
use crate::sqlx_client::{
create_table_raw_transactions, get_count_not_processed_raw_transactions,
get_count_raw_transactions, get_raw_transactions, insert_raw_transaction,
insert_raw_transactions,
insert_raw_transactions, update_raw_transactions_set_processed_true,
};
use chrono::{NaiveDateTime, Utc};
use futures::channel::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -74,6 +75,7 @@ pub fn test_from_raw_transactions(
pg_pool,
0,
rx_commit,
RawCache::new(),
));
BufferedConsumerChannels {
rx_parsed_events,
Expand Down Expand Up @@ -106,6 +108,8 @@ async fn parse_kafka_transactions(
.build()
.unwrap();

let raw_cache = RawCache::new();

let time = Arc::new(RwLock::new(0));
{
let time = time.clone();
Expand Down Expand Up @@ -162,16 +166,20 @@ async fn parse_kafka_transactions(
.expect("cant insert raw_transaction: rip db");
}

log::info!("kafka synced");

{
let pg_pool = config.pg_pool.clone();
let parser = parser.clone();
let raw_cache = raw_cache.clone();
tokio::spawn(parse_raw_transaction(
parser,
tx_parsed_events,
notify_for_services,
pg_pool,
config.delay,
commit_rx,
raw_cache,
));
}

Expand All @@ -187,9 +195,10 @@ async fn parse_kafka_transactions(
let transaction: Transaction = produced_transaction.transaction.clone();
let transaction_timestamp = transaction.now;
if buff_extract_events(&transaction, transaction.hash().unwrap(), &parser).is_some() {
insert_raw_transaction(transaction.into(), &config.pg_pool)
insert_raw_transaction(transaction.clone().into(), &config.pg_pool)
.await
.expect("cant insert raw_transaction to db");
raw_cache.insert_raw(transaction.into()).await;
}

produced_transaction.commit().expect("dead stream kafka");
Expand All @@ -212,8 +221,8 @@ async fn parse_raw_transaction(
pg_pool: PgPool,
secs_delay: i32,
mut commit_rx: Receiver<()>,
raw_cache: RawCache,
) {
let mut notified = false;
let count_not_processed = get_count_not_processed_raw_transactions(&pg_pool).await;
let mut i: i64 = 0;

Expand All @@ -223,17 +232,13 @@ async fn parse_raw_transaction(
let mut begin = pg_pool.begin().await.expect("cant get pg transaction");

let raw_transactions_from_db =
get_raw_transactions(1000, timestamp_now - secs_delay, &mut begin)
get_raw_transactions(10_000, timestamp_now - secs_delay, &mut begin)
.await
.unwrap_or_default();

if raw_transactions_from_db.is_empty() {
if !notified {
notify.notify_one();
notified = true;
}
sleep(Duration::from_secs(1)).await;
continue;
notify.notify_one();
break;
}

let mut send_message = vec![];
Expand All @@ -260,14 +265,41 @@ async fn parse_raw_transaction(
}
begin.commit().await.expect("cant commit db update");

if !notified && i <= count_not_processed {
if i >= count_not_processed {
log::info!("end parse. synced");
notify.notify_one();
break;
} else {
log::info!("parsing {}/{}", i, count_not_processed);
}
}

if !notified && i >= count_not_processed {
notify.notify_one();
notified = true;
raw_cache.fill_raws(&pg_pool).await;

loop {
let (raw_transactions, times) = raw_cache.get_raws(secs_delay).await;
if raw_transactions.is_empty() {
sleep(Duration::from_secs(1)).await;
continue;
}

let mut send_message = vec![];
for raw_transaction in raw_transactions {
i += 1;

if let Some(events) =
buff_extract_events(&raw_transaction.data, raw_transaction.hash, &parser)
{
send_message.push((events, raw_transaction));
};
}

if !send_message.is_empty() {
tx.send(send_message).await.expect("dead sender");
commit_rx.next().await;
}

update_raw_transactions_set_processed_true(&pg_pool, times).await;
}
}

Expand Down
25 changes: 24 additions & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::Context;
use futures::channel::mpsc::{Receiver, Sender};
use indexer_lib::{AnyExtractable, AnyExtractableOutput, ParsedOutput};
use sqlx::PgPool;
use sqlx::postgres::PgRow;
use sqlx::{PgPool, Row};
use std::sync::Arc;
use tokio::sync::Notify;
use ton_block::{Deserializable, GetRepresentationHash, Serializable, Transaction};
Expand Down Expand Up @@ -53,6 +54,19 @@ pub struct RawTransactionFromDb {
pub processed: bool,
}

impl From<PgRow> for RawTransactionFromDb {
fn from(x: PgRow) -> Self {
RawTransactionFromDb {
transaction: x.get(0),
transaction_hash: x.get(1),
timestamp_block: x.get(2),
timestamp_lt: x.get(3),
created_at: x.get(4),
processed: x.get(5),
}
}
}

#[derive(Clone, Debug)]
pub struct RawTransaction {
pub hash: UInt256,
Expand Down Expand Up @@ -86,3 +100,12 @@ impl From<Transaction> for RawTransactionFromDb {
}
}
}

impl From<Transaction> for RawTransaction {
fn from(x: Transaction) -> Self {
RawTransaction {
hash: x.hash().unwrap(),
data: x,
}
}
}
82 changes: 67 additions & 15 deletions src/sqlx_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WHERE (timestamp_block, timestamp_lt) IN (SELECT timestamp_block, timestamp_lt
LIMIT $2)
returning *;";

const GET_ALL_RAW_TRANSACTIONS_QUERY: &str = "SELECT * FROM raw_transactions WHERE processed = false ORDER BY (timestamp_block, timestamp_lt);";

const CREATE_TABLE_RAW_TRANSACTIONS_QUERY: &str = "CREATE TABLE IF NOT EXISTS raw_transactions
(
transaction BYTEA NOT NULL,
Expand All @@ -41,7 +43,8 @@ const CREATE_TABLE_DROP_BASE_INDEX_QUERY: &str = "CREATE TABLE IF NOT EXISTS dro
value INTEGER NOT NULL
);";

const INSERT_DROP_BASE_INDEX_QUERY: &str = "INSERT INTO drop_base_index (name, value) values ('index', $1);";
const INSERT_DROP_BASE_INDEX_QUERY: &str =
"INSERT INTO drop_base_index (name, value) values ('index', $1);";

const SELECT_DROP_BASE_INDEX_QUERY: &str = "SELECT value FROM drop_base_index;";

Expand Down Expand Up @@ -77,6 +80,8 @@ $$;";

const CREATE_INDEX_RAW_TRANSACTIONS_QUERY: &str = "CREATE INDEX IF NOT EXISTS raw_transactions_ix_ttp ON raw_transactions (timestamp_block, timestamp_lt, processed);";

const UPDATE_RAW_TRANSACTIONS_PROCESSED_TRUE_QUERY: &str = "UPDATE raw_transactions SET processed = true WHERE (timestamp_block, timestamp_lt) IN (SELECT * FROM UNNEST ($1, $2));";

pub async fn insert_raw_transaction(
raw_transaction: RawTransactionFromDb,
pg_pool: &Pool<Postgres>,
Expand Down Expand Up @@ -109,14 +114,7 @@ pub async fn get_raw_transactions(
.await
.map(|y| {
y.into_iter()
.map(|x| RawTransactionFromDb {
transaction: x.get(0),
transaction_hash: x.get(1),
timestamp_block: x.get(2),
timestamp_lt: x.get(3),
created_at: x.get(4),
processed: x.get(5),
})
.map(RawTransactionFromDb::from)
.sorted_by(|x, y| match x.timestamp_block.cmp(&y.timestamp_block) {
Ordering::Less => Ordering::Less,
Ordering::Equal => x.timestamp_lt.cmp(&y.timestamp_lt),
Expand Down Expand Up @@ -196,7 +194,8 @@ pub async fn insert_raw_transactions(
pub async fn create_drop_index_table(pg_pool: &Pool<Postgres>) {
if let Err(e) = sqlx::query(CREATE_TABLE_DROP_BASE_INDEX_QUERY)
.execute(pg_pool)
.await {
.await
{
log::error!("create table drop_index ERROR {}", e);
}
}
Expand All @@ -212,21 +211,74 @@ pub async fn get_drop_index(pg_pool: &Pool<Postgres>) -> Result<i32, anyhow::Err
pub async fn insert_drop_index(pg_pool: &Pool<Postgres>, index: i32) {
if let Err(e) = sqlx::query(INSERT_DROP_BASE_INDEX_QUERY)
.bind(index)
.execute(pg_pool).await {
.execute(pg_pool)
.await
{
log::error!("insert index drop ERROR {}", e);
}
}

pub async fn drop_tables(pg_pool: &Pool<Postgres>) {
if let Err(e) = sqlx::query(DROP_TABLES_QUERY)
.execute(pg_pool).await {
if let Err(e) = sqlx::query(DROP_TABLES_QUERY).execute(pg_pool).await {
log::error!("drop tables ERROR {}", e);
}
}

pub async fn drop_functions(pg_pool: &Pool<Postgres>) {
if let Err(e) = sqlx::query(DROP_FUNCTIONS_QUERY)
.execute(pg_pool).await {
if let Err(e) = sqlx::query(DROP_FUNCTIONS_QUERY).execute(pg_pool).await {
log::error!("drop functions ERROR {}", e);
}
}

pub async fn get_all_raw_transactions(
pg_pool: &Pool<Postgres>,
) -> Result<Vec<RawTransactionFromDb>, anyhow::Error> {
sqlx::query(GET_ALL_RAW_TRANSACTIONS_QUERY)
.fetch_all(pg_pool)
.await
.map(|x| x.into_iter().map(RawTransactionFromDb::from).collect_vec())
.map_err(anyhow::Error::new)
}

pub async fn update_raw_transactions_set_processed_true(
pg_pool: &Pool<Postgres>,
times: Vec<(i32, i64)>,
) {
let (timestamp_blocks, timestamp_lts) =
times
.into_iter()
.fold((vec![], vec![]), |(mut block, mut lt), x| {
block.push(x.0);
lt.push(x.1);
(block, lt)
});

let mut args = PgArguments::default();
args.add(timestamp_blocks);
args.add(timestamp_lts);

if let Err(e) = sqlx::query_with(UPDATE_RAW_TRANSACTIONS_PROCESSED_TRUE_QUERY, args)
.execute(pg_pool)
.await
{
log::error!("update raw transactions processed true ERROR {}", e);
}
}

#[cfg(test)]
mod test {
use sqlx::PgPool;
use crate::{insert_raw_transactions, update_raw_transactions_set_processed_true};
use crate::models::RawTransactionFromDb;

#[tokio::test]
async fn test_insert() {
let pg_pool =
PgPool::connect("postgresql://postgres:postgres@localhost:5432/test_base")
.await
.unwrap();

let times = (vec![(1656071372, 27915771000001_i64), (1656070201, 27915328000006)]);
update_raw_transactions_set_processed_true(&pg_pool, times).await;
}
}

0 comments on commit 3cbe9c7

Please sign in to comment.