Skip to content

Commit

Permalink
Merge pull request #137 from mulimoen/feature/mpi
Browse files Browse the repository at this point in the history
MPI support
  • Loading branch information
mulimoen authored Jun 19, 2024
2 parents b4c5100 + b5b8277 commit 52d4361
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 0 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,33 @@ jobs:
RUSTFLAGS: "-Z sanitizer=address"
RUSTDOCFLAGS: "-Z sanitizer=address"
run: cargo test --features netcdf-sys/static,netcdf/derive --target x86_64-unknown-linux-gnu --workspace --exclude netcdf-derive

mpi:
name: mpi-runner
runs-on: ubuntu-latest
env:
NETCDF_DIR: /usr/lib/x86_64-linux-gnu/netcdf/mpi/
steps:
- name: Checkout repository
uses: actions/checkout@v4
with: {submodules: false}

- name: Install netcdf
run: sudo apt-get update && sudo apt-get install libnetcdf-mpi-dev libhdf5-openmpi-dev

- name: Install rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "nightly"

- name: Build
run: cargo build --verbose --workspace --exclude netcdf-src --features netcdf/mpi,derive

- name: Test
run: cargo test --verbose --workspace --exclude netcdf-src --features netcdf/mpi,derive

- name: Run example
run: cargo run --verbose --package netcdf-examples --features mpi

- name: Run example in parallel
run: mpirun -np 10 --oversubscribe -- target/debug/netcdf-examples
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"netcdf-sys",
"netcdf-src",
"netcdf-derive",
"netcdf-examples",
]
default-members = ["netcdf", "netcdf-sys"]
resolver = "2"
Expand All @@ -14,3 +15,4 @@ netcdf-sys = { path = "netcdf-sys", version = "0.6.2" }
netcdf-src = { path = "netcdf-src", version = "0.3.6" }
netcdf-derive = { path = "netcdf-derive", version = "0.1.0" }
hdf5-sys = { version = "0.8.0" }
mpi-sys = { version = "0.2.1" }
14 changes: 14 additions & 0 deletions netcdf-examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "netcdf-examples"
version = "0.1.0"
edition = "2021"
publish = false

[features]
mpi = ["dep:mpi", "netcdf/mpi", "dep:mpi-sys"]

[dependencies]
netcdf = { workspace = true }
mpi = { version = "0.7.0", optional = true }
mpi-sys = { workspace = true, optional = true }
ndarray = "0.15.6"
10 changes: 10 additions & 0 deletions netcdf-examples/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[cfg(feature = "mpi")]
mod parallel;

fn main() {
#[cfg(feature = "mpi")]
parallel::main().unwrap();

#[cfg(not(feature = "mpi"))]
println!("MPI support is not included, will not run this example");
}
69 changes: 69 additions & 0 deletions netcdf-examples/src/parallel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use mpi::traits::{AsRaw, Communicator};

fn target_function(rank: i32, t: usize) -> i32 {
100 * (t as i32) + rank
}

fn mpi_null_info() -> mpi_sys::MPI_Info {
let mut info = std::ptr::null_mut();
let e = unsafe { mpi_sys::MPI_Info_create(&mut info) };
assert_eq!(e, mpi_sys::MPI_SUCCESS.try_into().unwrap());

info
}

fn create(
path: &str,
communicator: impl Communicator + AsRaw<Raw = mpi_sys::MPI_Comm>,
) -> Result<(), Box<dyn std::error::Error>> {
let info = mpi_null_info();
let mut file =
netcdf::create_par_with(path, communicator.as_raw(), info, netcdf::Options::NETCDF4)?;

let size = communicator.size() as usize;
let rank = communicator.rank();

file.add_dimension("x", size)?;
file.add_unlimited_dimension("t")?;
let var = file.add_variable::<i32>("output", &["t", "x"])?;
var.access_collective()?;

file.enddef()?;

let mut var = file.variable_mut("output").unwrap();

let values = ndarray::Array1::from_shape_fn(10, |t| target_function(rank, t));
var.put((.., rank as usize), values.view())?;

Ok(())
}

fn read(
path: &str,
communicator: impl Communicator + AsRaw<Raw = mpi_sys::MPI_Comm>,
) -> Result<(), Box<dyn std::error::Error>> {
let info = mpi_null_info();

let file = netcdf::open_par_with(path, communicator.as_raw(), info, netcdf::Options::empty())?;

let rank = communicator.rank();
let var = file.variable("output").unwrap();
var.access_collective()?;
let values = var.get::<i32, _>((.., rank as usize))?;

for (t, &v) in values.iter().enumerate() {
assert_eq!(v, target_function(rank, t));
}
Ok(())
}

pub fn main() -> Result<(), Box<dyn std::error::Error>> {
let universe = mpi::initialize().unwrap();
let path = "par.nc";

create(path, universe.world())?;

read(path, universe.world())?;

Ok(())
}
1 change: 1 addition & 0 deletions netcdf-src/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ exclude = [

[features]
dap = ["dep:link-cplusplus"]
mpi = []

[dependencies]
hdf5-sys = { workspace = true, features = ["hl", "deprecated", "zlib"] }
Expand Down
4 changes: 4 additions & 0 deletions netcdf-src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ fn main() {
netcdf_config.define("ENABLE_BYTERANGE", "ON");
}

if feature!("MPI").is_ok() {
panic!("MPI feature was requested but the static build of netcdf does not support this");
}

let netcdf = netcdf_config.build();

println!("cargo:lib=netcdf");
Expand Down
2 changes: 2 additions & 0 deletions netcdf-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ libz-sys = { version = "1.0.25" }
curl-sys = { version = "0.4.51", optional = true }
hdf5-sys = { workspace = true }
netcdf-src = { workspace = true, optional = true }
mpi-sys = { workspace = true, optional = true }

[dev-dependencies]

Expand All @@ -32,6 +33,7 @@ default = []
memio = []
static = ["libz-sys/static", "hdf5-sys/static", "hdf5-sys/hl", "hdf5-sys/deprecated", "hdf5-sys/zlib", "dep:netcdf-src", "curl-sys?/static-curl", "curl-sys?/static-ssl"]
dap = ["dep:curl-sys", "netcdf-src?/dap"]
mpi = ["dep:mpi-sys", "netcdf-src?/mpi"]

[build-dependencies]
semver = "1.0.9"
Expand Down
8 changes: 8 additions & 0 deletions netcdf-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ impl NcMetaHeader {
"MEMIO requested but not found in this installation of netCDF"
);
}
if self.has_parallel {
println!("cargo:rustc-cfg=feature=\"has-par\"");
} else {
assert!(
feature!("MPI").is_err(),
"MPI requested but not found in this installation of netCDF"
);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions netcdf-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ mod filter;
#[cfg(feature = "4.8.0")]
pub use filter::*;

#[cfg(feature = "mpi")]
pub mod par;

use std::sync::Mutex;

/// Global netCDF lock for using all functions in the netCDF library
Expand Down
25 changes: 25 additions & 0 deletions netcdf-sys/src/par.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#![cfg(feature = "mpi")]
use std::ffi::{c_char, c_int};

use mpi_sys::{MPI_Comm, MPI_Info};

pub const NC_INDEPENDENT: c_int = 0;
pub const NC_COLLECTIVE: c_int = 1;

extern "C" {
pub fn nc_create_par(
path: *const c_char,
cmode: c_int,
comm: MPI_Comm,
info: MPI_Info,
ncidp: *mut c_int,
) -> c_int;
pub fn nc_open_par(
path: *const c_char,
mode: c_int,
comm: MPI_Comm,
info: MPI_Info,
ncidp: *mut c_int,
) -> c_int;
pub fn nc_var_par_access(ncid: c_int, varid: c_int, par_access: c_int) -> c_int;
}
3 changes: 3 additions & 0 deletions netcdf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ build = "build.rs"
default = ["ndarray"]
static = ["netcdf-sys/static"]
derive = ["dep:netcdf-derive"]
mpi = ["dep:mpi-sys", "netcdf-sys/mpi"]
ndarray = ["dep:ndarray"]

[dependencies]
ndarray = { version = "0.15", optional = true }
netcdf-sys = { workspace = true }
netcdf-derive = { workspace = true, optional = true }
bitflags = "2.4.2"
libc = "0.2.155"
mpi-sys = { workspace = true, optional = true }

[dev-dependencies]
clap = { version = "4.5.1", features = ["derive"] }
Expand Down
67 changes: 67 additions & 0 deletions netcdf/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,30 @@ impl RawFile {
Ok(File(Self { ncid }))
}

/// Open a `netCDF` file in read only mode in parallel mode.
#[cfg(feature = "mpi")]
pub(crate) fn open_par_with(
path: &path::Path,
communicator: mpi_sys::MPI_Comm,
info: mpi_sys::MPI_Info,
options: Options,
) -> error::Result<File> {
let f = get_ffi_from_path(path);
let mut ncid: nc_type = 0;
unsafe {
error::checked(with_lock(|| {
netcdf_sys::par::nc_open_par(
f.as_ptr().cast(),
options.bits(),
communicator,
info,
&mut ncid,
)
}))?;
}
Ok(File(Self { ncid }))
}

/// Open a `netCDF` file in append mode (read/write).
pub(crate) fn append_with(path: &path::Path, options: Options) -> error::Result<FileMut> {
let file = Self::open_with(path, options | Options::WRITE)?;
Expand All @@ -105,6 +129,31 @@ impl RawFile {
Ok(FileMut(File(Self { ncid })))
}

/// Create a new `netCDF` file in parallel mode
#[cfg(feature = "mpi")]
pub(crate) fn create_par_with(
path: &path::Path,
communicator: mpi_sys::MPI_Comm,
info: mpi_sys::MPI_Info,
options: Options,
) -> error::Result<FileMut> {
let f = get_ffi_from_path(path);
let mut ncid: nc_type = -1;
unsafe {
error::checked(with_lock(|| {
netcdf_sys::par::nc_create_par(
f.as_ptr().cast(),
options.bits(),
communicator,
info,
&mut ncid,
)
}))?;
}

Ok(FileMut(File(Self { ncid })))
}

#[cfg(feature = "has-mmap")]
pub(crate) fn open_from_memory<'buffer>(
name: Option<&str>,
Expand Down Expand Up @@ -225,6 +274,14 @@ impl File {
.unwrap()
.map(Result::unwrap)
}
/// Get the length of a dimension
pub fn dimension_len(&self, name: &str) -> Option<usize> {
let (ncid, name) =
super::group::try_get_parent_ncid_and_stem(self.ncid(), name).unwrap()?;
super::dimension::dimension_from_name(ncid, name)
.unwrap()
.map(|x| x.len())
}

/// Get a group
///
Expand Down Expand Up @@ -440,6 +497,16 @@ impl FileMut {
let Self(File(file)) = self;
file.close()
}

/// Open the file for new definitions
pub fn redef(&mut self) -> error::Result<()> {
error::checked(with_lock(|| unsafe { netcdf_sys::nc_redef(self.ncid()) }))
}

/// Close the file for new definitions
pub fn enddef(&mut self) -> error::Result<()> {
error::checked(with_lock(|| unsafe { netcdf_sys::nc_enddef(self.ncid()) }))
}
}

#[cfg(feature = "has-mmap")]
Expand Down
30 changes: 30 additions & 0 deletions netcdf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) mod error;
pub(crate) mod extent;
pub(crate) mod file;
pub(crate) mod group;
#[cfg(feature = "mpi")]
pub(crate) mod par;
pub(crate) mod putget;
#[cfg(feature = "4.9.2")]
pub mod rc;
Expand Down Expand Up @@ -170,6 +172,20 @@ where
RawFile::create_with(name.as_ref(), options)
}

/// Open a `netCDF` file in create and parallel mode with the given options
#[cfg(feature = "mpi")]
pub fn create_par_with<P>(
name: P,
communicator: mpi_sys::MPI_Comm,
info: mpi_sys::MPI_Info,
options: Options,
) -> error::Result<FileMut>
where
P: AsRef<std::path::Path>,
{
RawFile::create_par_with(name.as_ref(), communicator, info, options)
}

/// Open a `netCDF` file in append mode
pub fn append<P>(name: P) -> error::Result<FileMut>
where
Expand All @@ -194,6 +210,20 @@ where
open_with(name, Options::default())
}

/// Open in parallel mode
#[cfg(feature = "mpi")]
pub fn open_par_with<P>(
name: P,
communicator: mpi_sys::MPI_Comm,
info: mpi_sys::MPI_Info,
options: Options,
) -> error::Result<File>
where
P: AsRef<std::path::Path>,
{
RawFile::open_par_with(name.as_ref(), communicator, info, options)
}

/// Open a `netCDF` file in read mode with the given options
pub fn open_with<P>(name: P, options: Options) -> error::Result<File>
where
Expand Down
Loading

0 comments on commit 52d4361

Please sign in to comment.