Skip to content

Commit

Permalink
Ocean: Cleanup data access (#3044)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jouzo authored Sep 12, 2024
1 parent 48a383c commit ba4f838
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 214 deletions.
16 changes: 4 additions & 12 deletions lib/ain-ocean/src/api/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,11 @@ fn get_latest_aggregation(
.script_aggregation
.by_id
.list(Some((hid.clone(), u32::MAX)), SortOrder::Descending)?
.take(1)
.take_while(|item| match item {
Ok(((v, _), _)) => v == &hid,
_ => true,
})
.map(|item| {
let (_, v) = item?;
let res = v.into();
Ok(res)
})
.collect::<Result<Vec<_>>>()?;
.find(|item| matches!(item, Ok(((v, _), _)) if v == &hid))
.transpose()?
.map(|(_, v)| v.into());

Ok(latest.first().cloned())
Ok(latest)
}

#[ocean_endpoint]
Expand Down
27 changes: 9 additions & 18 deletions lib/ain-ocean/src/api/loan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,16 @@ async fn get_active_price(
) -> Result<Option<OraclePriceActive>> {
let (token, currency) = parse_fixed_interval_price(&fixed_interval_price_id)?;
let repo = &ctx.services.oracle_price_active;
let keys = repo
let Some((_, id)) = repo
.by_key
.list(Some((token, currency)), SortOrder::Descending)?
.take(1)
.flatten()
.collect::<Vec<_>>();

if keys.is_empty() {
return Ok(None);
}

let Some((_, id)) = keys.first() else {
.next()
.transpose()?
else {
return Ok(None);
};

let price = repo.by_id.get(id)?;
let price = repo.by_id.get(&id)?;

let Some(price) = price else {
return Ok(None);
Expand Down Expand Up @@ -681,16 +675,13 @@ async fn map_token_amounts(
.list(None, SortOrder::Descending)?
.collect::<Vec<_>>();
log::trace!("list_auctions keys: {:?}, token_id: {:?}", keys, id);

let active_price = repo
.by_key
.list(None, SortOrder::Descending)?
.take(1)
.take_while(|item| match item {
Ok((k, _)) => k.0 == id,
_ => true,
})
.find(|item| matches!(item, Ok((k, _)) if k.0 == id))
.map(|el| repo.by_key.retrieve_primary_value(el))
.collect::<Result<Vec<_>>>()?;
.transpose()?;

vault_token_amounts.push(VaultTokenAmountResponse {
id,
Expand All @@ -699,7 +690,7 @@ async fn map_token_amounts(
symbol: token_info.symbol,
symbol_key: token_info.symbol_key,
name: token_info.name,
active_price: active_price.first().cloned(),
active_price,
});
}

Expand Down
22 changes: 7 additions & 15 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,27 +543,19 @@ pub async fn get_aggregated_in_usd(
}

fn call_dftx(ctx: &Arc<AppContext>, txid: Txid) -> Result<Option<DfTx>> {
let vout = ctx
let Some(vout) = ctx
.services
.transaction
.vout_by_id
.list(Some((txid, 0)), SortOrder::Ascending)?
.take(1)
.take_while(|item| match item {
Ok((_, vout)) => vout.txid == txid,
_ => true,
})
.map(|item| {
let (_, v) = item?;
Ok(v)
})
.collect::<Result<Vec<_>>>()?;

if vout.is_empty() {
.find(|item| matches!(item, Ok((_, vout)) if vout.txid == txid))
.transpose()?
.map(|(_, v)| v)
else {
return Ok(None);
}
};

let bytes = &vout[0].script.hex;
let bytes = &vout.script.hex;
if bytes.len() > 6 && bytes[0] == 0x6a && bytes[1] <= 0x4e {
let offset = 1 + match bytes[1] {
0x4c => 2,
Expand Down
17 changes: 4 additions & 13 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async fn get_oracles(

let mut prices = Vec::new();
for oracle in oracles {
let feeds = ctx
let feed = ctx
.services
.oracle_price_feed
.by_id
Expand All @@ -296,18 +296,9 @@ async fn get_oracles(
)),
SortOrder::Descending,
)?
.take(1)
.take_while(|item| match item {
Ok((k, _)) => k.0 == token && k.1 == currency && k.2 == oracle.oracle_id,
_ => true,
})
.map(|item| {
let (_, data) = item?;
Ok(data)
})
.collect::<Result<Vec<_>>>()?;

let feed = feeds.first().cloned();
.find(|item| matches!(item, Ok((k, _)) if k.0 == token && k.1 == currency && k.2 == oracle.oracle_id))
.transpose()?
.map(|(_, data)| data);

prices.push(PriceOracleResponse {
id: format!("{}-{}-{}", oracle.id.0, oracle.id.1, oracle.id.2),
Expand Down
32 changes: 10 additions & 22 deletions lib/ain-ocean/src/indexer/loan_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,44 +166,32 @@ pub fn perform_active_price_tick(
block: &BlockContext,
) -> Result<()> {
let repo = &services.oracle_price_aggregated;
let prev_keys = repo
let Some((_, prev_id)) = repo
.by_key
.list(Some(ticker_id.clone()), SortOrder::Descending)?
.take(1)
.flatten() // return empty vec if none
.collect::<Vec<_>>();

if prev_keys.is_empty() {
return Ok(());
}

let Some((_, prev_id)) = prev_keys.first() else {
.next()
.transpose()?
else {
return Ok(());
};

let aggregated_price = repo.by_id.get(prev_id)?;
let aggregated_price = repo.by_id.get(&prev_id)?;

let Some(aggregated_price) = aggregated_price else {
return Ok(());
};

let repo = &services.oracle_price_active;
let prev_keys = repo
let Some((_, prev_id)) = repo
.by_key
.list(Some(ticker_id.clone()), SortOrder::Descending)?
.take(1)
.flatten()
.collect::<Vec<_>>();

if prev_keys.is_empty() {
return Ok(());
}

let Some((_, prev_id)) = prev_keys.first() else {
.next()
.transpose()?
else {
return Ok(());
};

let prev_price = repo.by_id.get(prev_id)?;
let prev_price = repo.by_id.get(&prev_id)?;

let Some(prev_price) = prev_price else {
return Ok(());
Expand Down
40 changes: 14 additions & 26 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,22 @@ fn index_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Re

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let repository = &services.pool_swap_aggregated;
let bucket = get_bucket(block, i64::from(interval));

let prevs = repository
let repository = &services.pool_swap_aggregated;
if let Some(prev) = repository
.by_key
.list(
Some((pool_pair.id, interval, i64::MAX)),
SortOrder::Descending,
)?
.take(1)
.take_while(|item| match item {
Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval,
_ => true,
})
.find(|item| matches!(item, Ok((k, _)) if k.0 == pool_pair.id && k.1 == interval))
.map(|e| repository.by_key.retrieve_primary_value(e))
.collect::<Result<Vec<_>>>()?;

let bucket = get_bucket(block, i64::from(interval));

if prevs.len() == 1 && prevs[0].bucket >= bucket {
break;
.transpose()?
{
if prev.bucket >= bucket {
break;
}
}

let aggregated = PoolSwapAggregated {
Expand Down Expand Up @@ -454,24 +450,16 @@ fn index_script_aggregation(services: &Arc<Services>, block: &Block<Transaction>

for (_, mut aggregation) in record.clone() {
let repo = &services.script_aggregation;
let latest = repo
if let Some(latest) = repo
.by_id
.list(
Some((aggregation.hid.clone(), u32::MAX)),
SortOrder::Descending,
)?
.take(1)
.take_while(|item| match item {
Ok(((hid, _), _)) => &aggregation.hid == hid,
_ => true,
})
.map(|item| {
let (_, v) = item?;
Ok(v)
})
.collect::<Result<Vec<_>>>()?;

if let Some(latest) = latest.first().cloned() {
.find(|item| matches!(item, Ok(((hid, _), _)) if hid == &aggregation.hid))
.transpose()?
.map(|(_, v)| v)
{
aggregation.statistic.tx_in_count += latest.statistic.tx_in_count;
aggregation.statistic.tx_out_count += latest.statistic.tx_out_count;

Expand Down
42 changes: 18 additions & 24 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,29 +678,25 @@ pub fn index_interval_mapper(
interval: OracleIntervalSeconds,
) -> Result<()> {
let repo = &services.oracle_price_aggregated_interval;
let previous = repo
let Some((_, id)) = repo
.by_key
.list(
Some((token.clone(), currency.clone(), interval.clone())),
SortOrder::Descending,
)?
.take(1)
.flatten()
.collect::<Vec<_>>();

if previous.is_empty() {
.next()
.transpose()?
else {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}

for (_, id) in previous {
let aggregated_interval = repo.by_id.get(&id)?;
if let Some(aggregated_interval) = aggregated_interval {
if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}
};

forward_aggregate(services, &aggregated_interval, aggregated)?;
let aggregated_interval = repo.by_id.get(&id)?;
if let Some(aggregated_interval) = aggregated_interval {
if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}

forward_aggregate(services, &aggregated_interval, aggregated)?;
}

Ok(())
Expand All @@ -721,21 +717,19 @@ pub fn invalidate_oracle_interval(
Some((token.to_string(), currency.to_string(), interval.clone())),
SortOrder::Descending,
)?
.take(1)
.map(|item| {
let (_, id) = item?;
let price = services
.next()
.transpose()?
.map(|(_, id)| {
services
.oracle_price_aggregated_interval
.by_id
.get(&id)?
.context(OtherSnafu {
msg: "Missing oracle price aggregated interval index",
})?;
Ok(price)
})
})
.collect::<Result<Vec<_>>>()?;

let previous = &previous[0];
.transpose()?
.unwrap(); // TODO assert if safe

if previous.aggregated.count == 1 {
return repo.by_id.delete(&previous.id);
Expand Down
Loading

0 comments on commit ba4f838

Please sign in to comment.