Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix plotting getting stuck #3200

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesOrdered;
use futures::{select, FutureExt, SinkExt, StreamExt};
Expand Down Expand Up @@ -123,8 +123,30 @@ where
max_plotting_sectors_per_farm,
} = plotting_options;

let sector_plotting_options = &sector_plotting_options;
let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
let mut sectors_being_plotted = FuturesOrdered::new();
// Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
// semaphore in practice due to permit stored in `SectorPlottingResult`
let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
let process_plotting_result_fut = async move {
while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
process_plotting_result(
sector_plotting_result,
sectors_metadata,
sectors_being_modified,
&mut metadata_header,
Arc::clone(&sector_plotting_options.metadata_file),
)
.await?;
}

unreachable!(
"Stream will not end before the rest of the plotting process is shutting down"
);
};
let process_plotting_result_fut = process_plotting_result_fut.fuse();
let mut process_plotting_result_fut = pin!(process_plotting_result_fut);

// Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
// already started plotting to finish plotting and then update metadata header
Expand All @@ -138,7 +160,7 @@ where
let sector_index = sector_to_plot.sector_index;
let sector_plotting_init_fut = plot_single_sector(
sector_to_plot,
&sector_plotting_options,
sector_plotting_options,
sectors_metadata,
sectors_being_modified,
&plotting_semaphore,
Expand Down Expand Up @@ -168,25 +190,23 @@ where
break;
}
maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
process_plotting_result(
maybe_sector_plotting_result?,
sectors_metadata,
sectors_being_modified,
&mut metadata_header,
Arc::clone(&sector_plotting_options.metadata_file)
).await?;
sector_plotting_result_sender
.unbounded_send(maybe_sector_plotting_result?)
.expect("Sending means receiver is not dropped yet; qed");
}
result = process_plotting_result_fut => {
return result;
}
}
}
}
maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
process_plotting_result(
maybe_sector_plotting_result?,
sectors_metadata,
sectors_being_modified,
&mut metadata_header,
Arc::clone(&sector_plotting_options.metadata_file)
).await?;
sector_plotting_result_sender
.unbounded_send(maybe_sector_plotting_result?)
.expect("Sending means receiver is not dropped yet; qed");
}
result = process_plotting_result_fut => {
return result;
}
}
}
Expand All @@ -195,7 +215,7 @@ where
}

async fn process_plotting_result(
sector_plotting_result: SectorPlottingResult,
sector_plotting_result: SectorPlottingResult<'_>,
sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
metadata_header: &mut PlotMetadataHeader,
Expand All @@ -205,6 +225,7 @@ async fn process_plotting_result(
sector_metadata,
replotting,
last_queued,
plotting_permit,
} = sector_plotting_result;

let sector_index = sector_metadata.sector_index;
Expand Down Expand Up @@ -241,6 +262,8 @@ async fn process_plotting_result(
}
}

drop(plotting_permit);

Ok(())
}

Expand All @@ -250,10 +273,11 @@ enum PlotSingleSectorResult<F> {
FatalError(PlottingError),
}

struct SectorPlottingResult {
struct SectorPlottingResult<'a> {
sector_metadata: SectorMetadataChecksummed,
replotting: bool,
last_queued: bool,
plotting_permit: SemaphoreGuard<'a>,
}

async fn plot_single_sector<'a, NC>(
Expand All @@ -262,7 +286,9 @@ async fn plot_single_sector<'a, NC>(
sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
plotting_semaphore: &'a Semaphore,
) -> PlotSingleSectorResult<impl Future<Output = Result<SectorPlottingResult, PlottingError>> + 'a>
) -> PlotSingleSectorResult<
impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
>
where
NC: NodeClient,
{
Expand Down Expand Up @@ -478,12 +504,11 @@ where
.sector_update
.call_simple(&(sector_index, sector_state));

drop(plotting_permit);

Ok(SectorPlottingResult {
sector_metadata,
replotting,
last_queued,
plotting_permit,
})
})
}
Expand Down
Loading