Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Upgrade sdk to gemini-3g-2024-jan-08 #237

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 262 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ edition = "2021"

[dependencies]
sdk-dsn = { path = "dsn" }
sdk-farmer = { path = "farmer" }
sdk-farmer = { path = "farmer", default-features = false }
sdk-node = { path = "node" }
sdk-substrate = { path = "substrate" }
sdk-utils = { path = "utils" }
static_assertions = "1.1.0"

subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }

# The only triple tested and confirmed as working in `jemallocator` crate is `x86_64-unknown-linux-gnu`
[target.'cfg(all(target_arch = "x86_64", target_vendor = "unknown", target_os = "linux", target_env = "gnu"))'.dev-dependencies]
Expand All @@ -28,7 +28,7 @@ derive_more = "0.99"
fdlimit = "0.2"
futures = "0.3"
serde_json = "1"
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
tempfile = "3"
tokio = { version = "1.34.0", features = ["rt-multi-thread", "macros"] }
tracing = "0.1"
Expand Down Expand Up @@ -150,7 +150,10 @@ members = [
]

[features]
default = []
default = ["numa"]
numa = [
"sdk-farmer/numa",
]
integration-test = [
"sdk-utils/integration-test",
"sdk-dsn/integration-test",
Expand Down
6 changes: 3 additions & 3 deletions dsn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ hex = "0.4.3"
parking_lot = "0.12"
prometheus-client = "0.22.0"
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
sdk-utils = { path = "../utils" }
serde = { version = "1", features = ["derive"] }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", default-features = false }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
tracing = "0.1"

[features]
Expand Down
20 changes: 12 additions & 8 deletions farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,31 @@ derive_builder = "0.12"
derive_more = "0.99"
futures = "0.3"
lru = "0.11.0"
libmimalloc-sys = { version = "0.1.35", features = ["extended"] }
parking_lot = "0.12"
pin-project = "1"
rayon = "1.7.0"
sdk-traits = { path = "../traits" }
sdk-utils = { path = "../utils" }
serde = { version = "1", features = ["derive"] }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf", features = ["parallel"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", default-features = false }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", features = ["parallel"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
thiserror = "1"
tokio = { version = "1.34.0", features = ["fs", "rt", "tracing", "macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio-stream = { version = "0.1", features = ["sync", "time"] }
tracing = "0.1"
tracing-futures = "0.2"

[features]
default = []
default = ["numa"]
numa = [
"subspace-farmer/numa",
]
integration-test = [
"sdk-utils/integration-test",
"sdk-traits/integration-test"
Expand Down
157 changes: 96 additions & 61 deletions farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmId, SingleDiskFarmInfo,
SingleDiskFarmOptions, SingleDiskFarmSummary,
};
use subspace_farmer::thread_pool_manager::PlottingThreadPoolManager;
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::{
all_cpu_cores, create_plotting_thread_pool_manager, thread_pool_core_indices,
};
use subspace_farmer::{Identity, KNOWN_PEERS_CACHE_SIZE};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed};
Expand All @@ -43,7 +47,7 @@ use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::KnownPeersManager;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use tokio::sync::{mpsc, oneshot, watch, Mutex, Semaphore};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};
use tracing_futures::Instrument;

/// Description of the farm
Expand Down Expand Up @@ -77,26 +81,10 @@ mod builder {
use sdk_traits::Node;
use sdk_utils::{ByteSize, PublicKey};
use serde::{Deserialize, Serialize};
use tracing::warn;

use super::BuildError;
use crate::{FarmDescription, Farmer};

fn available_parallelism() -> usize {
match std::thread::available_parallelism() {
Ok(parallelism) => parallelism.get(),
Err(error) => {
warn!(
%error,
"Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \
options manually"
);

0
}
}
}

#[derive(
Debug,
Clone,
Expand Down Expand Up @@ -175,38 +163,36 @@ mod builder {
pub max_pieces_in_sector: Option<u16>,
/// Size of PER FARM thread pool used for farming (mostly for blocking
/// I/O, but also for some compute-intensive operations during
/// proving), defaults to number of CPU cores available in
/// the system
#[builder(default = "available_parallelism()")]
pub farming_thread_pool_size: usize,
/// Size of PER FARM thread pool used for plotting, defaults to number
/// of CPU cores available in the system.
/// proving), defaults to number of logical CPUs
/// available on UMA system and number of logical CPUs in
/// first NUMA node on NUMA system.
#[builder(default)]
pub farming_thread_pool_size: Option<NonZeroUsize>,
/// Size of one thread pool used for plotting, defaults to number of
/// logical CPUs available on UMA system and number of logical
/// CPUs available in NUMA node on NUMA system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer
/// will plot multiple sectors concurrently, see
/// `sector_downloading_concurrency` and
/// `sector_encoding_concurrency` options.
#[builder(default = "available_parallelism()")]
pub plotting_thread_pool_size: usize,
/// Size of PER FARM thread pool used for replotting, typically smaller
/// pool than for plotting to not affect farming as much,
/// defaults to half of the number of CPU cores available in the
/// system.
/// Number of thread pools is defined by `--sector-encoding-concurrency`
/// option, different thread pools might have different number
/// of threads if NUMA nodes do not have the same size.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer
/// will replot multiple sectors concurrently, see
/// `sector-downloading-concurrency` and
/// `sector-encoding-concurrency` options.
#[builder(default = "available_parallelism() / 2")]
pub replotting_thread_pool_size: usize,
/// Sector downloading concurrency
#[builder(default = "NonZeroUsize::new(2).expect(\"2 > 0\")")]
#[derivative(Default(value = "NonZeroUsize::new(2).expect(\"2 > 0\")"))]
pub sector_downloading_concurrency: NonZeroUsize,
/// Sector encoding concurrency
#[builder(default = "NonZeroUsize::new(1).expect(\"1 > 0\")")]
#[derivative(Default(value = "NonZeroUsize::new(1).expect(\"1 > 0\")"))]
pub sector_encoding_concurrency: NonZeroUsize,
/// Threads will be pinned to corresponding CPU cores at creation.
#[builder(default)]
pub plotting_thread_pool_size: Option<NonZeroUsize>,
/// the plotting process, defaults to `--sector-downloading-concurrency`
/// + 1 to download future sector ahead of time
#[builder(default)]
pub sector_downloading_concurrency: Option<NonZeroUsize>,
/// Defines how many sectors farmer will encode concurrently, defaults
/// to 1 on UMA system and number of NUMA nodes on NUMA system.
/// It is further restricted by `sector_downloading_concurrency`
/// and setting this option higher than
/// `sector_downloading_concurrency` will have no effect.
#[builder(default)]
pub sector_encoding_concurrency: Option<NonZeroUsize>,
/// Threads will be pinned to corresponding CPU cores at creation.
#[builder(default)]
pub replotting_thread_pool_size: Option<NonZeroUsize>,
}

impl Builder {
Expand Down Expand Up @@ -484,8 +470,65 @@ impl Config {
};

let mut plotting_delay_senders = Vec::with_capacity(farms.len());
let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get()));
let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));

let plotting_thread_pool_core_indices =
thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency);
let replotting_thread_pool_core_indices = {
let mut replotting_thread_pool_core_indices =
thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency);
if replotting_thread_pool_size.is_none() {
// The default behavior is to use all CPU cores, but for replotting we just want
// half
replotting_thread_pool_core_indices
.iter_mut()
.for_each(|set| set.truncate(set.cpu_cores().len() / 2));
}
replotting_thread_pool_core_indices
};

let downloading_semaphore = Arc::new(Semaphore::new(
sector_downloading_concurrency
.map(|sector_downloading_concurrency| sector_downloading_concurrency.get())
.unwrap_or(plotting_thread_pool_core_indices.len() + 1),
));

let all_cpu_cores = all_cpu_cores();
let plotting_thread_pool_manager = create_plotting_thread_pool_manager(
plotting_thread_pool_core_indices.into_iter().zip(replotting_thread_pool_core_indices),
)?;
let farming_thread_pool_size = farming_thread_pool_size
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(|| {
all_cpu_cores
.first()
.expect("Not empty according to function description; qed")
.cpu_cores()
.len()
});

if all_cpu_cores.len() > 1 {
info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected");

if all_cpu_cores.len() > farms.len() {
warn!(
numa_nodes = %all_cpu_cores.len(),
farms_count = %farms.len(),
"Too few disk farms, CPU will not be utilized fully during plotting, same number of farms as NUMA \
nodes or more is recommended"
);
}
}

// TODO: Remove code or environment variable once identified whether it helps or
// not
if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 {
unsafe {
libmimalloc_sys::mi_option_set(
libmimalloc_sys::mi_option_use_numa_nodes,
all_cpu_cores.len() as std::ffi::c_long,
);
}
}

for (disk_farm_idx, description) in farms.iter().enumerate() {
let (plotting_delay_sender, plotting_delay_receiver) =
Expand All @@ -503,11 +546,9 @@ impl Config {
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay: Some(plotting_delay_receiver),
downloading_semaphore: Arc::clone(&downloading_semaphore),
encoding_semaphore: Arc::clone(&encoding_semaphore),
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
})
.await?;

Expand Down Expand Up @@ -835,11 +876,9 @@ struct FarmOptions<'a, PG, N: sdk_traits::Node> {
pub erasure_coding: ErasureCoding,
pub max_pieces_in_sector: u16,
pub farming_thread_pool_size: usize,
pub plotting_thread_pool_size: usize,
pub replotting_thread_pool_size: usize,
pub plotting_delay: Option<futures::channel::oneshot::Receiver<()>>,
pub downloading_semaphore: Arc<Semaphore>,
pub encoding_semaphore: Arc<Semaphore>,
pub plotting_thread_pool_manager: PlottingThreadPoolManager,
}

impl<T: subspace_proof_of_space::Table> Farm<T> {
Expand All @@ -855,11 +894,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
erasure_coding,
max_pieces_in_sector,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay,
downloading_semaphore,
encoding_semaphore,
plotting_thread_pool_manager,
}: FarmOptions<
'_,
impl subspace_farmer_components::plotting::PieceGetter + Clone + Send + Sync + 'static,
Expand All @@ -884,11 +921,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
piece_getter,
cache_percentage,
downloading_semaphore,
encoding_semaphore,
farm_during_initial_plotting: false,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_thread_pool_manager,
plotting_delay,
};
let single_disk_farm_fut = SingleDiskFarm::new::<_, _, T>(description, disk_farm_idx);
Expand Down
Loading
Loading