Skip to content

Commit

Permalink
refactor: Remove global exchange_rate and validators_apys_ caches…
Browse files Browse the repository at this point in the history
… in order to work with `cargo test` (#3774)
  • Loading branch information
samuel-rufi authored Oct 31, 2024
1 parent f1935f4 commit 527fab1
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 84 deletions.
2 changes: 1 addition & 1 deletion crates/iota-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The crate provides following tests currently:
# run tests requiring only postgres integration
cargo test --features pg_integration -- --test-threads 1
# run rpc tests with shared runtime
cargo test --features shared_test_runtime -- --test-threads 1
cargo test --features shared_test_runtime
```

For a better testing experience is possible to use [nextest](https://nexte.st/)
Expand Down
77 changes: 57 additions & 20 deletions crates/iota-indexer/src/apis/governance_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use cached::{SizedCache, proc_macro::cached};
use cached::{Cached, SizedCache};
use diesel::r2d2::R2D2Connection;
use iota_json_rpc::{IotaRpcModule, governance_api::ValidatorExchangeRates};
use iota_json_rpc_api::GovernanceReadApiServer;
Expand All @@ -23,6 +23,7 @@ use iota_types::{
timelock::timelocked_staked_iota::TimelockedStakedIota,
};
use jsonrpsee::{RpcModule, core::RpcResult};
use tokio::sync::Mutex;

use crate::{errors::IndexerError, indexer_reader::IndexerReader};

Expand All @@ -32,19 +33,27 @@ const MAX_QUERY_STAKED_OBJECTS: usize = 1000;
#[derive(Clone)]
pub struct GovernanceReadApi<T: R2D2Connection + 'static> {
inner: IndexerReader<T>,
exchange_rates_cache: Arc<Mutex<SizedCache<EpochId, Vec<ValidatorExchangeRates>>>>,
validators_apys_cache: Arc<Mutex<SizedCache<EpochId, BTreeMap<IotaAddress, f64>>>>,
}

impl<T: R2D2Connection + 'static> GovernanceReadApi<T> {
pub fn new(inner: IndexerReader<T>) -> Self {
Self { inner }
Self {
inner,
exchange_rates_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
validators_apys_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
}
}

/// Get a validator's APY by its address
pub async fn get_validator_apy(
&self,
address: &IotaAddress,
) -> Result<Option<f64>, IndexerError> {
let apys = validators_apys_map(self.get_validators_apy().await?);
let apys = self
.validators_apys_map(self.get_validators_apy().await?)
.await;
Ok(apys.get(address).copied())
}

Expand Down Expand Up @@ -261,6 +270,28 @@ impl<T: R2D2Connection + 'static> GovernanceReadApi<T> {
}
Ok(delegated_stakes)
}

/// Cache a map representing the validators' APYs for this epoch
async fn validators_apys_map(&self, apys: ValidatorApys) -> BTreeMap<IotaAddress, f64> {
// check if the apys are already in the cache
if let Some(cached_apys) = self
.validators_apys_cache
.lock()
.await
.cache_get(&apys.epoch)
{
return cached_apys.clone();
}

let ret = BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy)));
// insert the apys into the cache
self.validators_apys_cache
.lock()
.await
.cache_set(apys.epoch, ret.clone());

ret
}
}

fn stake_status(
Expand Down Expand Up @@ -292,15 +323,31 @@ fn stake_status(
/// Cached exchange rates for validators for the given epoch, the cache size is
/// 1, it will be cleared when the epoch changes. rates are in descending order
/// by epoch.
#[cached(
type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
create = "{ SizedCache::with_size(1) }",
convert = "{ system_state_summary.epoch }",
result = true
)]
pub async fn exchange_rates(
state: &GovernanceReadApi<impl R2D2Connection>,
system_state_summary: &IotaSystemStateSummary,
) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
let epoch = system_state_summary.epoch;

let mut cache = state.exchange_rates_cache.lock().await;

// Check if the exchange rates for the current epoch are cached
if let Some(cached_rates) = cache.cache_get(&epoch) {
return Ok(cached_rates.clone());
}

// Cache miss: compute exchange rates
let exchange_rates = compute_exchange_rates(state, system_state_summary).await?;

// Store in cache
cache.cache_set(epoch, exchange_rates.clone());

Ok(exchange_rates)
}

pub async fn compute_exchange_rates(
state: &GovernanceReadApi<impl R2D2Connection>,
system_state_summary: &IotaSystemStateSummary,
) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
// Get validator rate tables
let mut tables = vec![];
Expand Down Expand Up @@ -384,16 +431,6 @@ pub async fn exchange_rates(
Ok(exchange_rates)
}

/// Cache a map representing the validators' APYs for this epoch
#[cached(
type = "SizedCache<EpochId, BTreeMap<IotaAddress, f64>>",
create = "{ SizedCache::with_size(1) }",
convert = " {apys.epoch} "
)]
fn validators_apys_map(apys: ValidatorApys) -> BTreeMap<IotaAddress, f64> {
BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy)))
}

#[async_trait]
impl<T: R2D2Connection + 'static> GovernanceReadApiServer for GovernanceReadApi<T> {
async fn get_stakes_by_ids(
Expand Down
15 changes: 12 additions & 3 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use iota_metrics::init_metrics;
use iota_types::{
base_types::{ObjectID, SequenceNumber},
digests::TransactionDigest,
object::Object,
};
use jsonrpsee::{
http_client::{HttpClient, HttpClientBuilder},
Expand Down Expand Up @@ -57,9 +58,12 @@ impl ApiTestSetup {
GLOBAL_API_TEST_SETUP.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();

let (cluster, store, client) = runtime.block_on(
start_test_cluster_with_read_write_indexer(None, Some("shared_test_indexer_db")),
);
let (cluster, store, client) =
runtime.block_on(start_test_cluster_with_read_write_indexer(
None,
Some("shared_test_indexer_db"),
None,
));

Self {
runtime,
Expand Down Expand Up @@ -115,6 +119,7 @@ impl SimulacrumTestSetup {
pub async fn start_test_cluster_with_read_write_indexer(
stop_cluster_after_checkpoint_seq: Option<u64>,
database_name: Option<&str>,
objects: Option<Vec<Object>>,
) -> (TestCluster, PgIndexerStore<PgConnection>, HttpClient) {
let temp = tempdir().unwrap().into_path();
let mut builder = TestClusterBuilder::new().with_data_ingestion_dir(temp.clone());
Expand All @@ -126,6 +131,10 @@ pub async fn start_test_cluster_with_read_write_indexer(
)));
};

if let Some(objects) = objects {
builder = builder.with_objects(objects);
}

let cluster = builder.build().await;

// start indexer in write mode
Expand Down
Loading

0 comments on commit 527fab1

Please sign in to comment.