Skip to content

Commit

Permalink
Response to review on #1486 (#1499)
Browse files Browse the repository at this point in the history
Response to review on #1486
  • Loading branch information
anorth committed Jan 22, 2024
1 parent 049b9c7 commit 84b0c00
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 108 deletions.
71 changes: 35 additions & 36 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ impl Actor {
let commd = if sector.deal_ids.is_empty() {
None
} else {
let proposals_iter = &mut sector_proposals.iter().map(|(_, p)| p);
let proposals_iter = sector_proposals.iter().map(|(_, p)| p);
Some(compute_data_commitment(rt, proposals_iter, sector.sector_type)?)
};

Expand Down Expand Up @@ -554,25 +554,31 @@ impl Actor {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<SectorDealActivation> = vec![];
let mut activated_deals: HashSet<DealID> = HashSet::new();
let mut activated_deals = BTreeSet::<DealID>::new();
let mut sectors_deals: Vec<(SectorNumber, SectorDealIDs)> = vec![];

'sector: for sector in params.sectors {
let mut sector_deal_ids: HashSet<DealID> = HashSet::new();
let mut sector_deal_ids = sector.deal_ids.clone();
sector_deal_ids.sort();
if sector_deal_ids.windows(2).any(|w| w[0] == w[1]) {
log::warn!("failed to activate sector, duplicate deal");
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
continue;
}
let mut validated_proposals = vec![];
// Iterate once to validate all the requested deals.
// If a deal fails, skip the whole sector.
for deal_id in &sector.deal_ids {
for &deal_id in &sector.deal_ids {
// Check each deal is present only once, within and across sectors.
if sector_deal_ids.contains(deal_id) || activated_deals.contains(deal_id) {
if activated_deals.contains(&deal_id) {
log::warn!("failed to activate sector, duplicated deal {}", deal_id);
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
continue 'sector;
}

let proposal = match preactivate_deal(
rt,
*deal_id,
deal_id,
&proposals,
&states,
&pending_deals,
Expand All @@ -588,12 +594,11 @@ impl Actor {
continue 'sector;
}
};
sector_deal_ids.insert(*deal_id);
validated_proposals.push(proposal);
}

let mut verified_infos = vec![];
let mut nonverified_deal_space: BigInt = BigInt::zero();
let mut nonverified_deal_space = BigInt::zero();
// Given that all deals validated, prepare the state updates for them all.
// There's no continue below here to ensure updates are consistent.
// Any error must abort.
Expand Down Expand Up @@ -627,8 +632,7 @@ impl Actor {
}

let data_commitment = if params.compute_cid && !sector.deal_ids.is_empty() {
let proposal_iter = &mut validated_proposals.iter();
Some(compute_data_commitment(rt, proposal_iter, sector.sector_type)?)
Some(compute_data_commitment(rt, &validated_proposals, sector.sector_type)?)
} else {
None
};
Expand All @@ -644,7 +648,7 @@ impl Actor {
}

st.put_deal_states(rt.store(), &deal_states)?;
st.put_sector_deal_ids(rt.store(), &miner_addr, &sectors_deals)?;
st.put_sector_deal_ids(rt.store(), miner_addr.id().unwrap(), &sectors_deals)?;
st.save_pending_deal_allocation_ids(&mut pending_deal_allocation_ids)?;
Ok((activations, batch_gen.gen()))
})?;
Expand Down Expand Up @@ -677,19 +681,18 @@ impl Actor {

for sector in &params.sectors {
let mut sector_deal_ids: Vec<DealID> = vec![];
let mut pieces_ret: Vec<ext::miner::PieceReturn> = vec![];
for piece in &sector.added {
let mut pieces_ret: Vec<_> =
vec![ext::miner::PieceReturn { accepted: false }; sector.added.len()];
for (piece, ret) in sector.added.iter().zip(&mut pieces_ret) {
let deal_id: DealID = match deserialize(&piece.payload, "deal id") {
Ok(v) => v,
Err(e) => {
log::warn!("failed to deserialize deal id {:?}: {}", piece.payload, e);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
};
if activated_deals.contains(&deal_id) {
log::warn!("duplicated deal {}", deal_id);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}

Expand All @@ -707,7 +710,6 @@ impl Actor {
Ok(id) => id,
Err(e) => {
log::warn!("failed to activate deal {}: {}", deal_id, e);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
};
Expand All @@ -719,7 +721,6 @@ impl Actor {
piece.data,
proposal.piece_cid
);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}
if piece.size != proposal.piece_size {
Expand All @@ -729,17 +730,14 @@ impl Actor {
piece.size.0,
proposal.piece_size.0
);
pieces_ret.push(ext::miner::PieceReturn { accepted: false });
continue;
}

// No continue below here, to ensure state changes are consistent.
activated_deals.insert(deal_id);

// Remove any verified allocation ID for the pending deal.
// The allocation ID is only used in BatchActivateDeals
// (where it's returned to the caller).
pending_deal_allocation_ids.delete(&deal_id)?.unwrap_or(NO_ALLOCATION_ID);
pending_deal_allocation_ids.delete(&deal_id)?;

deal_states.push((
deal_id,
Expand All @@ -751,15 +749,15 @@ impl Actor {
},
));
sector_deal_ids.push(deal_id);
pieces_ret.push(ext::miner::PieceReturn { accepted: true });
ret.accepted = true;
}

sectors_deals.push((sector.sector, SectorDealIDs { deals: sector_deal_ids }));
assert_eq!(pieces_ret.len(), sector.added.len(), "mismatched piece returns");
sectors_ret.push(ext::miner::SectorReturn { added: pieces_ret });
}
st.put_deal_states(rt.store(), &deal_states)?;
st.put_sector_deal_ids(rt.store(), &miner_addr, &sectors_deals)?;
st.put_sector_deal_ids(rt.store(), miner_addr.id().unwrap(), &sectors_deals)?;
st.save_pending_deal_allocation_ids(&mut pending_deal_allocation_ids)?;

assert_eq!(sectors_ret.len(), params.sectors.len(), "mismatched sector returns");
Expand All @@ -782,16 +780,15 @@ impl Actor {
rt.transaction(|st: &mut State, rt| {
// The sector deals mapping is removed all at once.
// Other deal clean-up is deferred to cron.
let sector_deals =
st.pop_sector_deal_ids(rt.store(), &miner_addr, params.sectors.iter())?;
let all_deal_ids = sector_deals
.iter()
.flat_map(|(_, deal_ids)| deal_ids.deals.iter())
.collect::<Vec<_>>();
let all_deal_ids = st.pop_sector_deal_ids(
rt.store(),
miner_addr.id().unwrap(),
params.sectors.iter(),
)?;

let mut deal_states: Vec<(DealID, DealState)> = vec![];
for id in all_deal_ids {
let deal = st.find_proposal(rt.store(), *id)?;
let deal = st.find_proposal(rt.store(), id)?;
// The deal may have expired and been deleted before the sector is terminated.
// Nothing to do, but continue execution for the other deals.
if deal.is_none() {
Expand All @@ -817,7 +814,7 @@ impl Actor {
}

let mut state: DealState = st
.find_deal_state(rt.store(), *id)?
.find_deal_state(rt.store(), id)?
// A deal with a proposal but no state is not activated, but then it should not be
// part of a sector that is terminating.
.ok_or_else(|| actor_error!(illegal_argument, "no state for deal {}", id))?;
Expand All @@ -832,7 +829,7 @@ impl Actor {
// and slashing of provider collateral happens in cron_tick.
state.slash_epoch = params.epoch;

deal_states.push((*id, state));
deal_states.push((id, state));
}

st.put_deal_states(rt.store(), &deal_states)?;
Expand All @@ -850,7 +847,7 @@ impl Actor {
rt.transaction(|st: &mut State, rt| {
let last_cron = st.last_cron;
let mut provider_deals_to_remove: BTreeMap<
Address,
ActorID,
BTreeMap<SectorNumber, Vec<DealID>>,
> = BTreeMap::new();
let mut new_updates_scheduled: BTreeMap<ChainEpoch, Vec<DealID>> = BTreeMap::new();
Expand Down Expand Up @@ -938,8 +935,10 @@ impl Actor {
// Delete proposal and state simultaneously.
let deleted = st.remove_deal_state(rt.store(), deal_id)?;
if let Some(deleted) = deleted {
// All proposals are stored with normalised addresses.
let provider = deal.provider.id().unwrap();
provider_deals_to_remove
.entry(deal.provider)
.entry(provider)
.or_default()
.entry(deleted.sector_number)
.or_default()
Expand Down Expand Up @@ -1190,9 +1189,9 @@ fn get_proposals<BS: Blockstore>(
Ok(proposals)
}

fn compute_data_commitment(
fn compute_data_commitment<'a>(
rt: &impl Runtime,
proposals: &mut dyn Iterator<Item = &DealProposal>,
proposals: impl IntoIterator<Item = &'a DealProposal>,
sector_type: RegisteredSealProof,
) -> Result<Cid, ActorError> {
let mut pieces = vec![];
Expand Down
Loading

0 comments on commit 84b0c00

Please sign in to comment.