Skip to content

Commit

Permalink
Send SectorContentChanged from new onboarding methods (#1386)
Browse files Browse the repository at this point in the history
Send SectorContentChanged from new onboarding methods. Simplify return values to make notifications fire and forget.
  • Loading branch information
anorth committed Dec 14, 2023
1 parent 35efa32 commit 351a15a
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 149 deletions.
37 changes: 9 additions & 28 deletions actors/market/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ pub mod miner {
use cid::Cid;
use fvm_ipld_encoding::RawBytes;
use fvm_shared::clock::ChainEpoch;
use fvm_shared::error::ExitCode;
use fvm_shared::piece::PaddedPieceSize;
use fvm_shared::sector::SectorNumber;
use fvm_shared::MethodNum;

pub const CONTROL_ADDRESSES_METHOD: u64 = 2;
pub const IS_CONTROLLING_ADDRESS_EXPORTED: u64 =
frc42_dispatch::method_hash!("IsControllingAddress");
pub const SECTOR_CONTENT_CHANGED: MethodNum =
frc42_dispatch::method_hash!("SectorContentChanged");

#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct GetControlAddressesReturnParams {
Expand All @@ -54,63 +56,42 @@ pub mod miner {
pub address: Address,
}

// Notification of change committed to one or more sectors.
// The relevant state must be already committed so the receiver can observe any impacts
// at the sending miner actor.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorContentChangedParams {
// Distinct sectors with changed content.
pub sectors: Vec<SectorChanges>,
}

// Description of changes to one sector's content.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct SectorChanges {
// Identifier of sector being updated.
pub sector: SectorNumber,
// Minimum epoch until which the data is committed to the sector.
// Note the sector may later be extended without necessarily another notification.
pub minimum_commitment_epoch: ChainEpoch,
// Information about some pieces added to (or retained in) the sector.
// This may be only a subset of sector content.
// Inclusion here does not mean the piece was definitely absent previously.
// Exclusion here does not mean a piece has been removed since a prior notification.
pub added: Vec<PieceInfo>,
pub added: Vec<PieceChange>,
}

// Description of a piece of data committed to a sector.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct PieceInfo {
pub struct PieceChange {
pub data: Cid,
pub size: PaddedPieceSize,
// A receiver-specific identifier.
// E.g. an encoded deal ID which the provider claims this piece satisfies.
pub payload: RawBytes,
}

// For each piece in each sector, the notifee returns an exit code and
// (possibly-empty) result data.
// The miner actor will pass through results to its caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorContentChangedReturn {
// A result for each sector that was notified, in the same order.
pub sectors: Vec<SectorReturn>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorReturn {
// A result for each piece for the sector that was notified, in the same order.
pub added: Vec<PieceReturn>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct PieceReturn {
// Indicates whether the receiver accepted the notification.
// The caller is free to ignore this, but may chose to abort and roll back.
pub code: ExitCode,
// Receiver-specific result data about the piece, to be passed back to top level caller.
pub data: Vec<u8>,
pub accepted: bool,
}
}

Expand Down
55 changes: 20 additions & 35 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ use fil_actors_runtime::{
use fil_actors_runtime::{extract_send_result, BatchReturnGen, FIRST_ACTOR_SPECIFIC_EXIT_CODE};

use crate::balance_table::BalanceTable;
use crate::ext::miner::{
PieceReturn, SectorContentChangedParams, SectorContentChangedReturn, SectorReturn,
};
use crate::ext::verifreg::{AllocationID, AllocationRequest};

pub use self::deal::*;
Expand Down Expand Up @@ -96,7 +93,7 @@ pub enum Method {
GetDealProviderCollateralExported = frc42_dispatch::method_hash!("GetDealProviderCollateral"),
GetDealVerifiedExported = frc42_dispatch::method_hash!("GetDealVerified"),
GetDealActivationExported = frc42_dispatch::method_hash!("GetDealActivation"),
SectorContentChangedExported = frc42_dispatch::method_hash!("SectorContentChanged"),
SectorContentChangedExported = ext::miner::SECTOR_CONTENT_CHANGED,
}

/// Market Actor
Expand Down Expand Up @@ -658,8 +655,8 @@ impl Actor {
/// This is an alternative to ActivateDeals.
fn sector_content_changed(
rt: &impl Runtime,
params: SectorContentChangedParams,
) -> Result<SectorContentChangedReturn, ActorError> {
params: ext::miner::SectorContentChangedParams,
) -> Result<ext::miner::SectorContentChangedReturn, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
let miner_addr = rt.message().caller();
let curr_epoch = rt.curr_epoch();
Expand All @@ -674,29 +671,23 @@ impl Actor {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut activated_deals: HashSet<DealID> = HashSet::new();
let mut sectors_deals: Vec<(SectorNumber, SectorDealIDs)> = vec![];
let mut sectors_ret: Vec<SectorReturn> = vec![];
let mut sectors_ret: Vec<ext::miner::SectorReturn> = vec![];

for sector in &params.sectors {
let mut sector_deal_ids: Vec<DealID> = vec![];
let mut pieces_ret: Vec<PieceReturn> = vec![];
let mut pieces_ret: Vec<ext::miner::PieceReturn> = vec![];
for piece in &sector.added {
let deal_id: DealID = match deserialize(&piece.payload.clone(), "deal id") {
Ok(v) => v,
Err(e) => {
log::warn!("failed to deserialize deal id: {}", e);
pieces_ret.push(PieceReturn {
code: ExitCode::USR_SERIALIZATION,
data: vec![],
});
log::warn!("failed to deserialize deal id {:?}: {}", piece.payload, e);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
};
if activated_deals.contains(&deal_id) {
log::warn!("failed to activate duplicated deal {}", deal_id);
pieces_ret.push(PieceReturn {
code: ExitCode::USR_ILLEGAL_ARGUMENT,
data: vec![],
});
log::warn!("duplicated deal {}", deal_id);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}

Expand All @@ -713,36 +704,30 @@ impl Actor {
)? {
Ok(id) => id,
Err(e) => {
log::warn!("failed to activate: {}", e);
pieces_ret.push(PieceReturn { code: e.exit_code(), data: vec![] });
log::warn!("failed to activate deal {}: {}", deal_id, e);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
};

if piece.data != proposal.piece_cid {
log::warn!(
"failed to activate: piece CID {} doesn't match deal {} with {}",
piece.data,
"deal {} piece CID {} doesn't match {}",
deal_id,
piece.data,
proposal.piece_cid
);
pieces_ret.push(PieceReturn {
code: ExitCode::USR_ILLEGAL_ARGUMENT,
data: vec![],
});
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
if piece.size != proposal.piece_size {
log::warn!(
"failed to activate: piece size {} doesn't match deal {} with {}",
piece.size.0,
"deal {} piece size {} doesn't match {}",
deal_id,
piece.size.0,
proposal.piece_size.0
);
pieces_ret.push(PieceReturn {
code: ExitCode::USR_ILLEGAL_ARGUMENT,
data: vec![],
});
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}

Expand Down Expand Up @@ -772,12 +757,12 @@ impl Actor {
},
));
sector_deal_ids.push(deal_id);
pieces_ret.push(PieceReturn { code: ExitCode::OK, data: vec![] });
pieces_ret.push(ext::miner::PieceReturn { accepted: true });
}

sectors_deals.push((sector.sector, SectorDealIDs { deals: sector_deal_ids }));
assert_eq!(pieces_ret.len(), sector.added.len(), "mismatched piece returns");
sectors_ret.push(SectorReturn { added: pieces_ret });
sectors_ret.push(ext::miner::SectorReturn { added: pieces_ret });
}
st.put_deal_states(rt.store(), &deal_states)?;
st.put_sector_deal_ids(rt.store(), &miner_addr, &sectors_deals)?;
Expand All @@ -787,7 +772,7 @@ impl Actor {
Ok(sectors_ret)
})?;

Ok(SectorContentChangedReturn { sectors: sectors_ret })
Ok(ext::miner::SectorContentChangedReturn { sectors: sectors_ret })
}

/// Terminate a set of deals in response to their containing sector being terminated.
Expand Down
2 changes: 1 addition & 1 deletion actors/market/tests/activate_deal_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,5 @@ fn assert_activation_failure(
added: vec![piece],
}];
let ret = sector_content_changed(rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(vec![PieceReturn { code: exit, data: vec![] }], ret.sectors[0].added);
assert_eq!(vec![PieceReturn { accepted: false }], ret.sectors[0].added);
}
6 changes: 3 additions & 3 deletions actors/market/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{cell::RefCell, collections::HashMap};

use fil_actor_market::ext::account::{AuthenticateMessageParams, AUTHENTICATE_MESSAGE_METHOD};
use fil_actor_market::ext::miner::{
PieceInfo, SectorChanges, SectorContentChangedParams, SectorContentChangedReturn,
PieceChange, SectorChanges, SectorContentChangedParams, SectorContentChangedReturn,
};
use fil_actor_market::ext::verifreg::{AllocationID, AllocationRequest, AllocationsResponse};
use fil_actor_market::{
Expand Down Expand Up @@ -1255,8 +1255,8 @@ where
ret
}

pub fn piece_info_from_deal(id: DealID, deal: &DealProposal) -> PieceInfo {
PieceInfo {
pub fn piece_info_from_deal(id: DealID, deal: &DealProposal) -> PieceChange {
PieceChange {
data: deal.piece_cid,
size: deal.piece_size,
payload: serialize(&id, "deal id").unwrap(),
Expand Down
56 changes: 22 additions & 34 deletions actors/market/tests/sector_content_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use multihash::MultihashDigest;
use num_traits::Zero;

use fil_actor_market::ext::miner::{
PieceInfo, PieceReturn, SectorChanges, SectorContentChangedParams,
PieceChange, PieceReturn, SectorChanges, SectorContentChangedParams,
};
use fil_actor_market::{DealProposal, Method, NO_ALLOCATION_ID};
use fil_actors_runtime::cbor::serialize;
Expand Down Expand Up @@ -76,7 +76,7 @@ fn simple_one_sector() {
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(1, ret.sectors.len());
assert_eq!(3, ret.sectors[0].added.len());
assert!(ret.sectors[0].added.iter().all(|r| r.code == ExitCode::OK));
assert!(ret.sectors[0].added.iter().all(|r| r.accepted));

// Deal IDs are stored under the sector, in correct order.
assert_eq!(deal_ids, get_sector_deal_ids(&rt, &PROVIDER_ADDR, sno));
Expand Down Expand Up @@ -124,9 +124,9 @@ fn simple_multiple_sectors() {
];
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(3, ret.sectors.len());
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] }], ret.sectors[0].added);
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] }], ret.sectors[1].added);
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] }], ret.sectors[2].added);
assert_eq!(vec![PieceReturn { accepted: true }], ret.sectors[0].added);
assert_eq!(vec![PieceReturn { accepted: true }], ret.sectors[1].added);
assert_eq!(vec![PieceReturn { accepted: true }], ret.sectors[2].added);

// Deal IDs are stored under the right sector, in correct order.
assert_eq!(deal_ids[0..2], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 1));
Expand Down Expand Up @@ -174,7 +174,7 @@ fn piece_must_match_deal() {
// Wrong size
pieces[1].size = PaddedPieceSize(1234);
// Deal doesn't exist
pieces.push(PieceInfo {
pieces.push(PieceChange {
data: Cid::new_v1(0, Sha2_256.digest(&[1, 2, 3, 4])),
size: PaddedPieceSize(1234),
payload: serialize(&1234, "deal id").unwrap(),
Expand All @@ -186,9 +186,9 @@ fn piece_must_match_deal() {
assert_eq!(1, ret.sectors.len());
assert_eq!(
vec![
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::USR_NOT_FOUND, data: vec![] },
PieceReturn { accepted: false },
PieceReturn { accepted: false },
PieceReturn { accepted: false },
],
ret.sectors[0].added
);
Expand All @@ -214,10 +214,7 @@ fn invalid_deal_id_rejected() {
vec![SectorChanges { sector: 1, minimum_commitment_epoch: END_EPOCH + 10, added: pieces }];
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(1, ret.sectors.len());
assert_eq!(
vec![PieceReturn { code: ExitCode::USR_SERIALIZATION, data: vec![] },],
ret.sectors[0].added
);
assert_eq!(vec![PieceReturn { accepted: false },], ret.sectors[0].added);

check_state(&rt);
}
Expand Down Expand Up @@ -256,19 +253,13 @@ fn failures_isolated() {
assert_eq!(3, ret.sectors.len());
// Broken second piece still allows first piece in same sector to activate.
assert_eq!(
vec![
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] }
],
vec![PieceReturn { accepted: true }, PieceReturn { accepted: false }],
ret.sectors[0].added
);
// Broken third piece
assert_eq!(
vec![PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] }],
ret.sectors[1].added
);
assert_eq!(vec![PieceReturn { accepted: false }], ret.sectors[1].added);
// Ok fourth piece.
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] }], ret.sectors[2].added);
assert_eq!(vec![PieceReturn { accepted: true }], ret.sectors[2].added);

// Successful deal IDs are stored under the right sector, in correct order.
assert_eq!(deal_ids[0..1], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 1));
Expand Down Expand Up @@ -297,9 +288,9 @@ fn rejects_duplicates_in_same_sector() {
// The first piece succeeds just once, the second piece succeeds too.
assert_eq!(
vec![
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { accepted: true },
PieceReturn { accepted: false },
PieceReturn { accepted: true },
],
ret.sectors[0].added
);
Expand Down Expand Up @@ -340,21 +331,18 @@ fn rejects_duplicates_across_sectors() {
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(3, ret.sectors.len());
// Succeeds in the first time.
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] },], ret.sectors[0].added);
assert_eq!(vec![PieceReturn { accepted: true },], ret.sectors[0].added);
// Fails second time, but other piece succeeds.
assert_eq!(
vec![
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
],
vec![PieceReturn { accepted: false }, PieceReturn { accepted: true },],
ret.sectors[1].added
);
// Both duplicates fail, but third piece succeeds.
assert_eq!(
vec![
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { accepted: false },
PieceReturn { accepted: false },
PieceReturn { accepted: true },
],
ret.sectors[2].added
);
Expand Down Expand Up @@ -387,7 +375,7 @@ fn create_deals(rt: &MockRuntime, count: i64) -> Vec<DealProposal> {
.collect()
}

fn pieces_from_deals(deal_ids: &[DealID], deals: &[DealProposal]) -> Vec<PieceInfo> {
fn pieces_from_deals(deal_ids: &[DealID], deals: &[DealProposal]) -> Vec<PieceChange> {
deal_ids.iter().zip(deals).map(|(id, deal)| piece_info_from_deal(*id, deal)).collect()
}

Expand Down
Loading

0 comments on commit 351a15a

Please sign in to comment.