Skip to content

Commit

Permalink
miner actor events
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jan 8, 2024
1 parent 9a22c67 commit 2ce92e6
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 82 deletions.
6 changes: 2 additions & 4 deletions actors/market/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,10 +1253,8 @@ pub fn terminate_deals_for(
}

pub fn terminate_deals(rt: &MockRuntime, miner_addr: Address, sectors: &[SectorNumber]) {
let all_deal_ids = sectors
.iter()
.flat_map(|s| get_sector_deal_ids(rt, &miner_addr, *s))
.collect::<Vec<_>>();
let all_deal_ids =
sectors.iter().flat_map(|s| get_sector_deal_ids(rt, &miner_addr, *s)).collect::<Vec<_>>();

let ret = terminate_deals_raw(rt, miner_addr, sectors, all_deal_ids).unwrap();
assert!(ret.is_none());
Expand Down
57 changes: 52 additions & 5 deletions actors/miner/src/emit.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use cid::Cid;
use fvm_shared::bigint::BigInt;
use fvm_shared::piece::PaddedPieceSize;
// A namespace for helpers that build and emit provider Actor events.
use fil_actors_runtime::runtime::Runtime;
use fil_actors_runtime::{ActorError, EventBuilder};
Expand All @@ -11,16 +14,32 @@ pub fn sector_precommitted(rt: &impl Runtime, sector: SectorNumber) -> Result<()
}

/// Indicates a sector has been activated.
pub fn sector_activated(rt: &impl Runtime, sector: SectorNumber) -> Result<(), ActorError> {
pub fn sector_activated(
rt: &impl Runtime,
sector: SectorNumber,
unsealed_cid: &Cid,
pieces: &[(Cid, PaddedPieceSize)],
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new().typ("sector-activated").field_indexed("sector", &sector).build()?,
&EventBuilder::new()
.typ("sector-activated")
.with_sector_info(sector, unsealed_cid, pieces)
.build()?,
)
}

/// Indicates a sector has been updated.
pub fn sector_updated(rt: &impl Runtime, sector: SectorNumber) -> Result<(), ActorError> {
pub fn sector_updated(
rt: &impl Runtime,
sector: SectorNumber,
unsealed_cid: &Cid,
pieces: &[(Cid, PaddedPieceSize)],
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new().typ("sector-updated").field_indexed("sector", &sector).build()?,
&EventBuilder::new()
.typ("sector-updated")
.with_sector_info(sector, unsealed_cid, pieces)
.build()?,
)
}

Expand All @@ -29,4 +48,32 @@ pub fn sector_terminated(rt: &impl Runtime, sector: SectorNumber) -> Result<(),
rt.emit_event(
&EventBuilder::new().typ("sector-terminated").field_indexed("sector", &sector).build()?,
)
}
}

trait WithSectorInfo {
fn with_sector_info(
self,
sector: SectorNumber,
unsealed_cid: &Cid,
pieces: &[(Cid, PaddedPieceSize)],
) -> EventBuilder;
}

impl WithSectorInfo for EventBuilder {
fn with_sector_info(
self,
sector: SectorNumber,
unsealed_cid: &Cid,
pieces: &[(Cid, PaddedPieceSize)],
) -> EventBuilder {
let mut event =
self.field_indexed("sector", &sector).field_indexed("unsealed-cid", unsealed_cid);

for piece in pieces {
event = event
.field_indexed("piece-cid", &piece.0)
.field("piece-size", &BigInt::from(piece.1 .0));
}
event
}
}
12 changes: 12 additions & 0 deletions actors/miner/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod market {
pub const VERIFY_DEALS_FOR_ACTIVATION_METHOD: u64 = 5;
pub const BATCH_ACTIVATE_DEALS_METHOD: u64 = 6;
pub const ON_MINER_SECTORS_TERMINATE_METHOD: u64 = 7;
pub const GET_DEAL_DATA_COMMITMENT: u64 = frc42_dispatch::method_hash!("GetDealDataCommitment");

#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct SectorDeals {
Expand Down Expand Up @@ -93,6 +94,17 @@ pub mod market {
pub struct VerifyDealsForActivationReturn {
pub unsealed_cids: Vec<Option<Cid>>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct GetDealDataCommitmentParams {
pub deal_id: DealID,
}

#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct GetDealDataCommitmentReturn {
pub data: Cid,
pub size: PaddedPieceSize,
}
}

pub mod power {
Expand Down
86 changes: 71 additions & 15 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::cmp;
use std::cmp::max;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::ops::Neg;

use anyhow::{anyhow, Error};
Expand All @@ -20,7 +20,7 @@ use fvm_shared::clock::{ChainEpoch, QuantSpec};
use fvm_shared::deal::DealID;
use fvm_shared::econ::TokenAmount;
use fvm_shared::error::*;
use fvm_shared::piece::PieceInfo;
use fvm_shared::piece::{PaddedPieceSize, PieceInfo};
use fvm_shared::randomness::*;
use fvm_shared::reward::ThisEpochRewardReturn;
use fvm_shared::sector::*;
Expand Down Expand Up @@ -825,8 +825,19 @@ impl Actor {
activate_sectors_deals(rt, &data_activation_inputs, compute_commd)?;
let activated_precommits = batch_return.successes(&valid_precommits);

activate_new_sector_infos(rt, activated_precommits, activated_data, &pledge_inputs, &info)?;
activate_new_sector_infos(
rt,
activated_precommits.clone(),
activated_data,
&pledge_inputs,
&info,
)?;

for pc in activated_precommits {
let unsealed_cid = pc.info.unsealed_cid.get_cid(pc.info.seal_proof)?;
let pieces = get_piece_manifest_from_deals(rt, &pc.info.deal_ids)?;
emit::sector_activated(rt, pc.info.sector_number, &unsealed_cid, &pieces)?;
}
// The aggregate fee is paid on the sectors successfully proven.
pay_aggregate_seal_proof_fee(rt, valid_precommits.len())?;
Ok(())
Expand Down Expand Up @@ -965,6 +976,9 @@ impl Actor {
sector_info: usi.sector_info,
activated_data,
});

let pieces = get_piece_manifest_from_deals(rt, &usi.update.deals)?;
emit::sector_updated(rt, usi.update.sector_number, &computed_commd, &pieces)?;
}

let (power_delta, pledge_delta) = update_replica_states(
Expand Down Expand Up @@ -1002,6 +1016,7 @@ impl Actor {
));
}

let mut sector_commds: HashMap<SectorNumber, Cid> = HashMap::new();
// Load sector infos for validation, failing if any don't exist.
let mut sectors = Sectors::load(&store, &state.sectors)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to load sectors array")?;
Expand All @@ -1025,6 +1040,8 @@ impl Actor {
// Validation needs to accept this empty proof.
replica_proof: params.sector_proofs.get(i).unwrap_or(&RawBytes::default()).clone(),
});

sector_commds.insert(update.sector, computed_commd);
}

// Validate inputs.
Expand Down Expand Up @@ -1165,6 +1182,17 @@ impl Actor {
sector_expiration: sector_info.expiration,
pieces: &update.pieces,
});

// create a Vec<(Cid, PaddedPieceSize)> from update.pieces and call emit::sector_updated
let pieces: Vec<(Cid, PaddedPieceSize)> =
update.pieces.iter().map(|x| (x.cid, x.size)).collect();

emit::sector_updated(
rt,
update.sector,
sector_commds.get(&update.sector).unwrap(),
&pieces,
)?;
}
notify_data_consumers(rt, &notifications, params.require_notification_success)?;

Expand Down Expand Up @@ -1865,6 +1893,12 @@ impl Actor {
sector_expiration: sector.info.expiration,
pieces: &activations.pieces,
});

let pieces: Vec<(Cid, PaddedPieceSize)> =
activations.pieces.iter().map(|p| (p.cid, p.size)).collect();
let unsealed_cid = sector.info.unsealed_cid.get_cid(sector.info.seal_proof)?;

emit::sector_activated(rt, sector.info.sector_number, &unsealed_cid, &pieces)?;
}
notify_data_consumers(rt, &notifications, params.require_notification_success)?;

Expand Down Expand Up @@ -1973,11 +2007,18 @@ impl Actor {
};
activate_new_sector_infos(
rt,
successful_activations,
successful_activations.clone(),
data_activations,
&pledge_inputs,
&info,
)
)?;

for pc in successful_activations.iter() {
let pieces = get_piece_manifest_from_deals(rt, &pc.info.deal_ids)?;
let unsealed_cid = pc.info.unsealed_cid.get_cid(pc.info.seal_proof)?;
emit::sector_activated(rt, pc.info.sector_number, &unsealed_cid, &pieces)?;
}
Ok(())
}

fn check_sector_proven(
Expand Down Expand Up @@ -3978,8 +4019,6 @@ where
expected_count
));
}
let updated_sector_nums =
new_sectors.iter().map(|x| x.sector_number).collect::<Vec<SectorNumber>>();

// Overwrite sector infos.
sectors.store(new_sectors).map_err(|e| {
Expand All @@ -3993,10 +4032,6 @@ where
e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to save deadlines")
})?;

for sector in updated_sector_nums {
emit::sector_updated(rt, sector)?;
}

// Update pledge.
let current_balance = rt.current_balance();
if pledge_delta.is_positive() {
Expand Down Expand Up @@ -4807,6 +4842,31 @@ fn verify_deals(
))?)
}

fn get_piece_manifest_from_deals(
rt: &impl Runtime,
deal_ids: &Vec<DealID>,
) -> Result<Vec<(Cid, PaddedPieceSize)>, ActorError> {
let mut pieces: Vec<(Cid, PaddedPieceSize)> = vec![];

for deal_id in deal_ids {
let ret = get_deal_data_commitment(rt, deal_id)?;
pieces.push((ret.data, ret.size));
}
Ok(pieces)
}

fn get_deal_data_commitment(
rt: &impl Runtime,
deal_id: &DealID,
) -> Result<ext::market::GetDealDataCommitmentReturn, ActorError> {
deserialize_block(extract_send_result(rt.send_simple(
&STORAGE_MARKET_ACTOR_ADDR,
ext::market::GET_DEAL_DATA_COMMITMENT,
IpldBlock::serialize_cbor(&ext::market::GetDealDataCommitmentParams { deal_id: *deal_id })?,
TokenAmount::zero(),
))?)
}

/// Requests the current epoch target block reward from the reward actor.
/// return value includes reward, smoothed estimate of reward, and baseline power
fn request_current_epoch_block_reward(
Expand Down Expand Up @@ -5276,10 +5336,6 @@ fn activate_new_sector_infos(

state.check_balance_invariants(&rt.current_balance()).map_err(balance_invariants_broken)?;

for sector in new_sector_numbers {
emit::sector_activated(rt, sector)?;
}

Ok((total_pledge, newly_vested))
})?;
// Request pledge update for activated sectors.
Expand Down
8 changes: 7 additions & 1 deletion actors/miner/tests/prove_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,13 @@ fn drop_invalid_prove_commit_while_processing_valid_one() {
verify_deals_exit: HashMap::from([(sector_no_a, ExitCode::USR_ILLEGAL_ARGUMENT)]),
..Default::default()
};
h.confirm_sector_proofs_valid_for(&rt, conf, vec![pre_commit_a, pre_commit_b], vec![sector_no_b]).unwrap();
h.confirm_sector_proofs_valid_for(
&rt,
conf,
vec![pre_commit_a, pre_commit_b],
vec![sector_no_b],
)
.unwrap();
let st = h.get_state(&rt);
assert!(st.get_sector(&rt.store, sector_no_a).unwrap().is_none());
assert!(st.get_sector(&rt.store, sector_no_b).unwrap().is_some());
Expand Down
8 changes: 4 additions & 4 deletions actors/miner/tests/prove_commit2_failures_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn reject_precommit_deals() {
expect_abort_contains_message(
ExitCode::USR_ILLEGAL_ARGUMENT,
"invalid pre-commit 0 while requiring activation success",
h.prove_commit_sectors2(&rt, &manifests, true, false, false, cfg),
h.prove_commit_sectors2_for(&rt, &manifests, true, false, false, cfg, vec![]),
);
h.check_state(&rt);
}
Expand All @@ -146,7 +146,7 @@ fn reject_all_proofs_fail() {
expect_abort_contains_message(
ExitCode::USR_ILLEGAL_ARGUMENT,
"no valid proofs",
h.prove_commit_sectors2(&rt, &activations, false, false, false, cfg),
h.prove_commit_sectors2_for(&rt, &activations, false, false, false, cfg, vec![]),
);
h.check_state(&rt);
}
Expand All @@ -159,7 +159,7 @@ fn reject_aggregate_proof_fails() {
expect_abort_contains_message(
ExitCode::USR_ILLEGAL_ARGUMENT,
"invalid aggregate proof",
h.prove_commit_sectors2(&rt, &activations, false, false, true, cfg),
h.prove_commit_sectors2_for(&rt, &activations, false, false, true, cfg, vec![]),
);
h.check_state(&rt);
}
Expand All @@ -172,7 +172,7 @@ fn reject_required_proof_failure() {
expect_abort_contains_message(
ExitCode::USR_ILLEGAL_ARGUMENT,
"invalid proof for sector 100 while requiring activation success",
h.prove_commit_sectors2(&rt, &activations, true, false, false, cfg),
h.prove_commit_sectors2_for(&rt, &activations, true, false, false, cfg, vec![]),
);
h.check_state(&rt);
}
Expand Down
Loading

0 comments on commit 2ce92e6

Please sign in to comment.