Skip to content

Commit

Permalink
0.7.5 mark fragment (#258)
Browse files Browse the repository at this point in the history
* refactor: upload file function refactor

* refactor: deal info struct refactor
  • Loading branch information
ytqaljn committed Nov 9, 2023
1 parent a3685f7 commit 6b61e8a
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 148 deletions.
Empty file.
41 changes: 13 additions & 28 deletions c-pallets/file-bank/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,31 @@ impl<T: Config> Pallet<T> {
pub fn generate_file(
file_hash: &Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
complete_list: BoundedVec<CompleteInfo<T>, T::FragmentCount>,
user_brief: UserBrief<T>,
stat: FileState,
file_size: u128,
) -> DispatchResult {
let mut segment_info_list: BoundedVec<SegmentInfo<T>, T::SegmentCount> = Default::default();
ensure!(complete_list.len() == FRAGMENT_COUNT as usize, Error::<T>::Unexpected);
for segment in deal_info.iter() {
let mut segment_info = SegmentInfo::<T> {
hash: segment.hash,
fragment_list: Default::default(),
};

for miner_task in &mut miner_task_list {
miner_task.fragment_list.sort();
}

for frag_hash in segment.fragment_list.iter() {
for miner_task in &mut miner_task_list {
if let Ok(index) = miner_task.fragment_list.binary_search(&frag_hash) {
let miner = miner_task.miner.clone().ok_or(Error::<T>::Unexpected)?;
let frag_info = FragmentInfo::<T> {
hash: *frag_hash,
avail: true,
miner: miner.clone(),
};
segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
miner_task.fragment_list.remove(index);
break;
}
}
for (index, fragment_hash) in segment.fragment_list.iter().enumerate() {
let frag_info = FragmentInfo::<T> {
hash: *fragment_hash,
avail: true,
miner: complete_list[index as usize].miner.clone(),
};

segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
}

segment_info_list.try_push(segment_info).map_err(|_e| Error::<T>::BoundedVecError)?;
}

let cur_block = <frame_system::Pallet<T>>::block_number();

let file_info = FileInfo::<T> {
Expand Down Expand Up @@ -111,7 +101,6 @@ impl<T: Config> Pallet<T> {
pub(super) fn generate_deal(
file_hash: Hash,
file_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
Expand All @@ -127,7 +116,6 @@ impl<T: Config> Pallet<T> {
file_size,
segment_list: file_info.clone(),
user: user_brief,
miner_task_list: miner_task_list,
complete_list: Default::default(),
};

Expand Down Expand Up @@ -179,11 +167,8 @@ impl<T: Config> Pallet<T> {
let needed_space = Self::cal_file_size(deal_info.segment_list.len() as u128);
T::StorageHandle::unlock_user_space(&deal_info.user.user, needed_space)?;
// unlock mienr space
for miner_task in deal_info.miner_task_list {
if let Some(miner) = miner_task.miner {
let count = miner_task.fragment_list.len() as u128;
T::MinerControl::unlock_space(&miner, FRAGMENT_SIZE * count)?;
}
for complete_info in deal_info.complete_list {
T::MinerControl::unlock_space(&complete_info.miner, FRAGMENT_SIZE * FRAGMENT_COUNT as u128)?;
}

<DealMap<T>>::remove(deal_hash);
Expand Down
37 changes: 37 additions & 0 deletions c-pallets/file-bank/src/impls/dealimpl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::*;

impl<T: Config> DealInfo<T> {
pub fn complete_part(&mut self, miner: AccountOf<T>, index: u8) -> DispatchResult {
for complete_info in &self.complete_list {
ensure!(index != complete_info.index, Error::<T>::Existed);
ensure!(miner != complete_info.miner, Error::<T>::Existed);
}

let complete_info = CompleteInfo::<T> {
index,
miner,
};

self.complete_list.try_push(complete_info).map_err(|_| Error::<T>::BoundedVecError)?;

Ok(())
}

pub fn completed_all(&mut self) -> DispatchResult {
self.stage = 2;
for complete_info in self.complete_list.iter() {
<PendingReplacements<T>>::try_mutate(&complete_info.miner, |pending_space| -> DispatchResult {
let replace_space = FRAGMENT_SIZE
.checked_mul(self.segment_list.len() as u128)
.ok_or(Error::<T>::Overflow)?;

*pending_space = pending_space
.checked_add(replace_space).ok_or(Error::<T>::Overflow)?;

Ok(())
})?;
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions c-pallets/file-bank/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod receptionist;
pub use receptionist::*;

pub mod dealimpl;
pub use dealimpl::*;
82 changes: 82 additions & 0 deletions c-pallets/file-bank/src/impls/receptionist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::*;

pub struct Receptionist<T: Config>(PhantomData<T>);

impl<T: Config> Receptionist<T> {
pub fn fly_upload_file(file_hash: Hash, user_brief: UserBrief<T>, used_space: u128) -> DispatchResult {
T::StorageHandle::update_user_space(&user_brief.user, 1, used_space)?;

if <Bucket<T>>::contains_key(&user_brief.user, &user_brief.bucket_name) {
Pallet::<T>::add_file_to_bucket(&user_brief.user, &user_brief.bucket_name, &file_hash)?;
} else {
Pallet::<T>::create_bucket_helper(&user_brief.user, &user_brief.bucket_name, Some(file_hash))?;
}

Pallet::<T>::add_user_hold_fileslice(&user_brief.user, file_hash, used_space)?;

<File<T>>::try_mutate(&file_hash, |file_opt| -> DispatchResult {
let file = file_opt.as_mut().ok_or(Error::<T>::FileNonExistent)?;
file.owner.try_push(user_brief.clone()).map_err(|_e| Error::<T>::BoundedVecError)?;
Ok(())
})?;

Ok(())
}

pub fn generate_deal(
file_hash: Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
user_brief: UserBrief<T>,
needed_space: u128,
file_size: u128,
) -> DispatchResult {
T::StorageHandle::lock_user_space(&user_brief.user, needed_space)?;
// TODO! Replace the file_hash param
Pallet::<T>::generate_deal(file_hash.clone(), deal_info, user_brief.clone(), file_size)?;

Ok(())
}

pub fn qualification_report_processing(sender: AccountOf<T>, deal_hash: Hash, deal_info: &mut DealInfo<T>, index: u8) -> DispatchResult {
deal_info.complete_part(sender.clone(), index)?;

// If it is the last submitter of the order.
if deal_info.complete_list.len() == FRAGMENT_COUNT as usize {
deal_info.completed_all()?;
Pallet::<T>::generate_file(
&deal_hash,
deal_info.segment_list.clone(),
deal_info.complete_list.clone(),
deal_info.user.clone(),
FileState::Calculate,
deal_info.file_size,
)?;

let segment_count = deal_info.segment_list.len();
let needed_space = Pallet::<T>::cal_file_size(segment_count as u128);
T::StorageHandle::unlock_and_used_user_space(&deal_info.user.user, needed_space)?;
T::StorageHandle::sub_total_idle_space(needed_space)?;
T::StorageHandle::add_total_service_space(needed_space)?;
let result = T::FScheduler::cancel_named(deal_hash.0.to_vec()).map_err(|_| Error::<T>::Unexpected);
if let Err(_) = result {
log::info!("transfer report cancel schedule failed: {:?}", deal_hash.clone());
}
// Calculate the maximum time required for storage nodes to tag files
let max_needed_cal_space = (segment_count as u32).checked_mul(FRAGMENT_SIZE as u32).ok_or(Error::<T>::Overflow)?;
let mut life: u32 = (max_needed_cal_space / TRANSFER_RATE as u32).checked_add(11).ok_or(Error::<T>::Overflow)?;
life = (max_needed_cal_space / CALCULATE_RATE as u32)
.checked_add(30).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;
Pallet::<T>::start_second_task(deal_hash.0.to_vec(), deal_hash, life)?;
if <Bucket<T>>::contains_key(&deal_info.user.user, &deal_info.user.bucket_name) {
Pallet::<T>::add_file_to_bucket(&deal_info.user.user, &deal_info.user.bucket_name, &deal_hash)?;
} else {
Pallet::<T>::create_bucket_helper(&deal_info.user.user, &deal_info.user.bucket_name, Some(deal_hash))?;
}
Pallet::<T>::add_user_hold_fileslice(&deal_info.user.user, deal_hash.clone(), needed_space)?;
Pallet::<T>::deposit_event(Event::<T>::StorageCompleted{ file_hash: deal_hash });
}

Ok(())
}
}
132 changes: 14 additions & 118 deletions c-pallets/file-bank/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ mod functions;
mod constants;
use constants::*;

mod impls;
use impls::receptionist::Receptionist;

use codec::{Decode, Encode};
use frame_support::{
// bounded_vec,
Expand Down Expand Up @@ -423,10 +426,6 @@ pub mod pallet {
origin: OriginFor<T>,
file_hash: Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
assigned_data: BoundedVec<
BoundedVec<Hash, T::MissionCount>,
ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>,
>,
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
Expand All @@ -436,7 +435,7 @@ pub mod pallet {
// Check file specifications.
ensure!(Self::check_file_spec(&deal_info), Error::<T>::SpecError);
// Check whether the user-defined name meets the rules.

let minimum = T::NameMinLength::get();
ensure!(user_brief.file_name.len() as u32 >= minimum, Error::<T>::SpecError);
ensure!(user_brief.bucket_name.len() as u32 >= minimum, Error::<T>::SpecError);
Expand All @@ -448,38 +447,9 @@ pub mod pallet {
ensure!(T::StorageHandle::get_user_avail_space(&user_brief.user)? > needed_space, Error::<T>::InsufficientAvailableSpace);

if <File<T>>::contains_key(&file_hash) {
T::StorageHandle::update_user_space(&user_brief.user, 1, needed_space)?;

if <Bucket<T>>::contains_key(&user_brief.user, &user_brief.bucket_name) {
Self::add_file_to_bucket(&user_brief.user, &user_brief.bucket_name, &file_hash)?;
} else {
Self::create_bucket_helper(&user_brief.user, &user_brief.bucket_name, Some(file_hash))?;
}

Self::add_user_hold_fileslice(&user_brief.user, file_hash, needed_space)?;

<File<T>>::try_mutate(&file_hash, |file_opt| -> DispatchResult {
let file = file_opt.as_mut().ok_or(Error::<T>::FileNonExistent)?;
file.owner.try_push(user_brief.clone()).map_err(|_e| Error::<T>::BoundedVecError)?;
Ok(())
})?;
Receptionist::<T>::fly_upload_file(file_hash, user_brief.clone(), needed_space)?;
} else {
ensure!(assigned_data.len() as u32 == ASSIGN_MINER_IDEAL_QUANTITY, Error::<T>::SpecError);
let mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>> = Default::default();
let mut index = 1;
for fragment_list in assigned_data {
ensure!(fragment_list.len() == deal_info.len(), Error::<T>::SpecError);
let miner_task = MinerTaskList::<T> {
index,
miner: None,
fragment_list,
};
miner_task_list.try_push(miner_task).map_err(|_| Error::<T>::BoundedVecError)?;
index += 1;
}
T::StorageHandle::lock_user_space(&user_brief.user, needed_space)?;
// TODO! Replace the file_hash param
Self::generate_deal(file_hash.clone(), deal_info, miner_task_list, user_brief.clone(), file_size)?;
Receptionist::<T>::generate_deal(file_hash, deal_info, user_brief.clone(), needed_space, file_size)?;
}

Self::deposit_event(Event::<T>::UploadDeclaration { operator: sender, owner: user_brief.user, deal_hash: file_hash });
Expand Down Expand Up @@ -619,81 +589,7 @@ pub mod pallet {
<DealMap<T>>::try_mutate(&deal_hash, |deal_info_opt| -> DispatchResult {
// can use unwrap because there was a judgment above
let deal_info = deal_info_opt.as_mut().unwrap();

if !deal_info.complete_list.contains(&index) {
for task_info in &mut deal_info.miner_task_list {
if task_info.index == index {
match task_info.miner {
Some(_) => Err(Error::<T>::Existed)?,
None => {
task_info.miner = Some(sender.clone());
T::MinerControl::lock_space(&sender, task_info.fragment_list.len() as u128 * FRAGMENT_SIZE)?;
deal_info.complete_list.try_push(index).map_err(|_| Error::<T>::BoundedVecError)?;
},
};
} else {
if let Some(miner) = &task_info.miner {
if miner == &sender {
Err(Error::<T>::Existed)?;
}
};
}
}


// If it is the last submitter of the order.
if deal_info.complete_list.len() == deal_info.miner_task_list.len() {
deal_info.stage = 2;
Self::generate_file(
&deal_hash,
deal_info.segment_list.clone(),
deal_info.miner_task_list.clone(),
deal_info.user.clone(),
FileState::Calculate,
deal_info.file_size,
)?;
let mut max_task_count = 0;
for miner_task in deal_info.miner_task_list.iter() {
let count = miner_task.fragment_list.len() as u128;
if count > max_task_count {
max_task_count = count;
}
// Miners need to report the replaced documents themselves.
// If a challenge is triggered before the report is completed temporarily,
// these documents to be replaced also need to be verified
if let Some(miner) = &miner_task.miner {
<PendingReplacements<T>>::try_mutate(miner, |pending_space| -> DispatchResult {
let replace_space = FRAGMENT_SIZE.checked_mul(count).ok_or(Error::<T>::Overflow)?;
let pending_space_temp = pending_space.checked_add(replace_space).ok_or(Error::<T>::Overflow)?;
*pending_space = pending_space_temp;
Ok(())
})?;
}
}
let needed_space = Self::cal_file_size(deal_info.segment_list.len() as u128);
T::StorageHandle::unlock_and_used_user_space(&deal_info.user.user, needed_space)?;
T::StorageHandle::sub_total_idle_space(needed_space)?;
T::StorageHandle::add_total_service_space(needed_space)?;
let result = T::FScheduler::cancel_named(deal_hash.0.to_vec()).map_err(|_| Error::<T>::Unexpected);
if let Err(_) = result {
log::info!("transfer report cancel schedule failed: {:?}", deal_hash.clone());
}
// Calculate the maximum time required for storage nodes to tag files
let max_needed_cal_space = (max_task_count as u32).checked_mul(FRAGMENT_SIZE as u32).ok_or(Error::<T>::Overflow)?;
let mut life: u32 = (max_needed_cal_space / TRANSFER_RATE as u32).checked_add(11).ok_or(Error::<T>::Overflow)?;
life = (max_needed_cal_space / CALCULATE_RATE as u32)
.checked_add(30).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;
Self::start_second_task(deal_hash.0.to_vec(), deal_hash, life)?;
if <Bucket<T>>::contains_key(&deal_info.user.user, &deal_info.user.bucket_name) {
Self::add_file_to_bucket(&deal_info.user.user, &deal_info.user.bucket_name, &deal_hash)?;
} else {
Self::create_bucket_helper(&deal_info.user.user, &deal_info.user.bucket_name, Some(deal_hash))?;
}
Self::add_user_hold_fileslice(&deal_info.user.user, deal_hash.clone(), needed_space)?;
Self::deposit_event(Event::<T>::StorageCompleted{ file_hash: deal_hash });
}
}
Receptionist::<T>::qualification_report_processing(sender.clone(), deal_hash, deal_info, index)?;
Ok(())
})?;

Expand All @@ -719,18 +615,18 @@ pub mod pallet {
let _ = ensure_root(origin)?;

let deal_info = <DealMap<T>>::try_get(&deal_hash).map_err(|_| Error::<T>::NonExistent)?;
for miner_task in deal_info.miner_task_list {
let count = miner_task.fragment_list.len() as u32;
let mut hash_list: Vec<Box<[u8; 256]>> = Default::default();
for fragment_hash in miner_task.fragment_list {
for (index, complete_info) in deal_info.complete_list.iter().enumerate() {
let count = FRAGMENT_COUNT;
let mut hash_list: Vec<Box<[u8; 256]>> = Default::default();
for segment in &deal_info.segment_list {
let fragment_hash = segment.fragment_list[index as usize];
let hash_temp = fragment_hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
hash_list.push(hash_temp);
}
// Accumulate the number of fragments stored by each miner
let unlock_space = FRAGMENT_SIZE.checked_mul(count as u128).ok_or(Error::<T>::Overflow)?;
let miner = miner_task.miner.ok_or(Error::<T>::Unexpected)?;
T::MinerControl::unlock_space_to_service(&miner, unlock_space)?;
T::MinerControl::insert_service_bloom(&miner, hash_list)?;
T::MinerControl::unlock_space_to_service(&complete_info.miner, unlock_space)?;
T::MinerControl::insert_service_bloom(&complete_info.miner, hash_list)?;
}

<File<T>>::try_mutate(&deal_hash, |file_opt| -> DispatchResult {
Expand Down
Loading

0 comments on commit 6b61e8a

Please sign in to comment.