From c37311629005d66f13fa5308a3fc0da35c7d867c Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sun, 1 Oct 2023 08:54:21 -0400 Subject: [PATCH 01/16] Refactor cat_dset_3d.py for easier parallelization --- python_scripts/cat_dset_3D.py | 191 +++++++++++++++++++++------------- 1 file changed, 118 insertions(+), 73 deletions(-) diff --git a/python_scripts/cat_dset_3D.py b/python_scripts/cat_dset_3D.py index 4cff6dc9a..7c403933e 100755 --- a/python_scripts/cat_dset_3D.py +++ b/python_scripts/cat_dset_3D.py @@ -17,9 +17,10 @@ import argparse import pathlib +# ====================================================================================================================== def main(): """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the - command line. If you're importing this file then use the `concat_3d` function directly. + command line. If you're importing this file then use the `concat_3d` or `concat_3d_single` functions directly. """ # Argument handling cli = argparse.ArgumentParser() @@ -30,6 +31,10 @@ def main(): # Optional Arguments cli.add_argument('-i', '--input_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The input directory.') cli.add_argument('-o', '--output_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The output directory.') + cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') + cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') + cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') + cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') args = cli.parse_args() # Perform the concatenation @@ -37,15 +42,23 @@ def main(): end_num=args.end_num, num_processes=args.num_processes, input_dir=args.input_dir, - output_dir=args.output_dir) - + output_dir=args.output_dir, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) +# ====================================================================================================================== # ====================================================================================================================== def concat_3d(start_num: int, end_num: int, num_processes: int, input_dir: pathlib.Path = pathlib.Path.cwd(), - output_dir: pathlib.Path = pathlib.Path.cwd()): + output_dir: pathlib.Path = pathlib.Path.cwd(), + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None): """Concatenate 3D HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a single, large file. All outputs from start_num to end_num will be concatenated. @@ -55,6 +68,10 @@ def concat_3d(start_num: int, num_processes (int): The number of processes that were used input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). + skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. + destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. + compression_options (str, optional): What compression settings to use if compressing. Defaults to None. """ # Error checking @@ -65,77 +82,105 @@ def concat_3d(start_num: int, # loop over outputs for n in range(start_num, end_num+1): + concat_3d_single(output_number=n, + num_processes=num_processes, + input_dir=input_dir, + output_dir=output_dir, + skip_fields=skip_fields, + destination_dtype=destination_dtype, + compression_type=compression_type, + compression_options=compression_options) +# ====================================================================================================================== - # loop over files for a given output - for i in range(0, num_processes): - - # open the output file for writing (don't overwrite if exists) - fileout = h5py.File(output_dir / f'{n}.h5', 'a') - # open the input file for reading - filein = h5py.File(input_dir / f'{n}.h5.{i}', 'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['dims'] = [nx, ny, nz] - fileout.attrs['gamma'] = [head['gamma'][0]] - fileout.attrs['t'] = [head['t'][0]] - fileout.attrs['dt'] = [head['dt'][0]] - fileout.attrs['n_step'] = [head['n_step'][0]] - - units = ['time_unit', 'mass_unit', 'length_unit', 'energy_unit', 'velocity_unit', 'density_unit'] - for unit in units: - fileout.attrs[unit] = [head[unit][0]] - - d = fileout.create_dataset("density", (nx, ny, nz), chunks=True, dtype=filein['density'].dtype) - mx = fileout.create_dataset("momentum_x", (nx, ny, nz), chunks=True, dtype=filein['momentum_x'].dtype) - my = fileout.create_dataset("momentum_y", (nx, ny, nz), chunks=True, dtype=filein['momentum_y'].dtype) - mz = fileout.create_dataset("momentum_z", (nx, ny, nz), chunks=True, dtype=filein['momentum_z'].dtype) - E = fileout.create_dataset("Energy", (nx, ny, nz), chunks=True, dtype=filein['Energy'].dtype) - try: - GE = fileout.create_dataset("GasEnergy", (nx, ny, nz), chunks=True, dtype=filein['GasEnergy'].dtype) - except KeyError: - print('No Dual energy data present'); - try: - bx = fileout.create_dataset("magnetic_x", (nx+1, ny, nz), chunks=True, dtype=filein['magnetic_x'].dtype) - by = fileout.create_dataset("magnetic_y", (nx, ny+1, nz), chunks=True, dtype=filein['magnetic_y'].dtype) - bz = fileout.create_dataset("magnetic_z", (nx, ny, nz+1), chunks=True, dtype=filein['magnetic_z'].dtype) - except KeyError: - print('No magnetic field data present'); - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - fileout['density'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['density'] - fileout['momentum_x'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['momentum_x'] - fileout['momentum_y'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['momentum_y'] - fileout['momentum_z'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['momentum_z'] - fileout['Energy'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['Energy'] - try: - fileout['GasEnergy'][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein['GasEnergy'] - except KeyError: - print('No Dual energy data present'); - try: - fileout['magnetic_x'][xs:xs+nxl+1, ys:ys+nyl, zs:zs+nzl] = filein['magnetic_x'] - fileout['magnetic_y'][xs:xs+nxl, ys:ys+nyl+1, zs:zs+nzl] = filein['magnetic_y'] - fileout['magnetic_z'][xs:xs+nxl, ys:ys+nyl, zs:zs+nzl+1] = filein['magnetic_z'] - except KeyError: - print('No magnetic field data present'); - - filein.close() - - fileout.close() # ====================================================================================================================== +def concat_3d_single(output_number: int, + num_processes: int, + input_dir: pathlib.Path = pathlib.Path.cwd(), + output_dir: pathlib.Path = pathlib.Path.cwd(), + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None): + """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a + single, large file. + + Args: + output_number (int): The output + end_num (int): The last output step to concatenate + num_processes (int): The number of processes that were used + input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). + output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). + skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. + destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. + compression_options (str, optional): What compression settings to use if compressing. Defaults to None. + """ + + # Error checking + assert num_processes > 1, 'num_processes must be greater than 1' + assert output_number >= 0, 'output_number must be greater than or equal to 0' + + # open the output file for writing (don't overwrite if exists) + fileout = h5py.File(output_dir / f'{output_number}.h5', 'a') + + # Setup the output file + with h5py.File(input_dir / f'{output_number}.h5.0', 'r') as source_file: + # Copy header data + fileout = copy_header(source_file, fileout) + + # Create the datasets in the output file + datasets_to_copy = list(source_file.keys()) + datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] + + for dataset in datasets_to_copy: + dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype + + data_shape = source_file.attrs['dims'] + + fileout.create_dataset(name=dataset, + shape=data_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) + + # loop over files for a given output + for i in range(0, num_processes): + # open the input file for reading + filein = h5py.File(input_dir / f'{output_number}.h5.{i}', 'r') + # read in the header data from the input file + head = filein.attrs + + # write data from individual processor file to correct location in concatenated file + nx_local, ny_local, nz_local = filein.attrs['dims_local'] + x_start, y_start, z_start = filein.attrs['offset'] + + for dataset in datasets_to_copy: + fileout[dataset][x_start:x_start+nx_local, y_start:y_start+ny_local,z_start:z_start+nz_local] = filein[dataset] + + filein.close() + + fileout.close() +# ====================================================================================================================== + +# ============================================================================== +def copy_header(source_file: h5py.File, destination_file: h5py.File): + """Copy the attributes of one HDF5 file to another, skipping all fields that are specific to an individual rank + + Args: + source_file (h5py.File): The source file + destination_file (h5py.File): The destination file + + Returns: + h5py.File: The destination file with the new header attributes + """ + fields_to_skip = ['dims_local', 'offset'] + + for attr_key in source_file.attrs.keys(): + if attr_key not in fields_to_skip: + destination_file.attrs[attr_key] = source_file.attrs[attr_key] + + return destination_file +# ============================================================================== if __name__ == '__main__': main() From 47f943002e00ece296139ab350b8be60a5036595 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sun, 1 Oct 2023 13:05:04 -0400 Subject: [PATCH 02/16] Refactor cat_slice.py to be more flexible + a CLI Adds a CLI to cat_slice.py, removes all the hardcoded variables. Also, adds a new function internally, `cat_slice` that can be imported into other scripts and used from there, including in parallel with Dask. --- python_scripts/cat_slice.py | 332 ++++++++++++++++++++++-------------- 1 file changed, 206 insertions(+), 126 deletions(-) diff --git a/python_scripts/cat_slice.py b/python_scripts/cat_slice.py index 7b6d15e12..51aae2d6d 100644 --- a/python_scripts/cat_slice.py +++ b/python_scripts/cat_slice.py @@ -1,130 +1,210 @@ -# Example file for concatenating on-axis slice data -# created when the -DSLICES flag is turned on +#!/usr/bin/env python3 +""" +Python script for concatenating slice hdf5 datasets for when -DSLICES is turned +on in Cholla. Includes a CLI for concatenating Cholla HDF5 datasets and can be +imported into other scripts where the `concat_slice` function can be used to +concatenate the HDF5 files. + +Generally the easiest way to import this script is to add the `python_scripts` +directory to your python path in your script like this: +``` +import sys +sys.path.append('/PATH/TO/CHOLLA/python_scripts') +import cat_slice +``` +""" import h5py +import argparse +import pathlib import numpy as np -ns = 0 -ne = 2 -n_procs = 4 # number of processors that did the cholla calculation -dnamein = '/gpfs/alpine/proj-shared/csc380/orlandow/o_cholla/out.21Sep20-Mon-14.17-357075-SOR_HYDRO_DISK/raw/' -dnameout = '/gpfs/alpine/proj-shared/csc380/orlandow/o_cholla/out.21Sep20-Mon-14.17-357075-SOR_HYDRO_DISK/catted_files' - -DE = True # set to True if Dual Energy flag was used -SCALAR = False # set to True if Scalar was used - -# loop over the output times -for n in range(ns, ne+1): - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_slice.h5', 'w') - - # loop over files for a given output time - for i in range(0, n_procs): - - # open the input file for reading - filein = h5py.File(dnamein+str(n)+'_slice.h5.'+str(i), 'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - gamma = head['gamma'] - t = head['t'] - dt = head['dt'] - n_step = head['n_step'] - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['gamma'] = gamma - fileout.attrs['t'] = t - fileout.attrs['dt'] = dt - fileout.attrs['n_step'] = n_step - fileout.attrs['dims'] = [nx, ny, nz] - - d_xy = np.zeros((nx,ny)) - d_xz = np.zeros((nx,nz)) - d_yz = np.zeros((ny,nz)) - mx_xy = np.zeros((nx,ny)) - mx_xz = np.zeros((nx,nz)) - mx_yz = np.zeros((ny,nz)) - my_xy = np.zeros((nx,ny)) - my_xz = np.zeros((nx,nz)) - my_yz = np.zeros((ny,nz)) - mz_xy = np.zeros((nx,ny)) - mz_xz = np.zeros((nx,nz)) - mz_yz = np.zeros((ny,nz)) - E_xy = np.zeros((nx,ny)) - E_xz = np.zeros((nx,nz)) - E_yz = np.zeros((ny,nz)) - if DE: - GE_xy = np.zeros((nx,ny)) - GE_xz = np.zeros((nx,nz)) - GE_yz = np.zeros((ny,nz)) - if SCALAR: - scalar_xy = np.zeros((nx,ny)) - scalar_xz = np.zeros((nx,nz)) - scalar_yz = np.zeros((ny,nz)) - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - - d_xy[xs:xs+nxl,ys:ys+nyl] += filein['d_xy'] - d_xz[xs:xs+nxl,zs:zs+nzl] += filein['d_xz'] - d_yz[ys:ys+nyl,zs:zs+nzl] += filein['d_yz'] - mx_xy[xs:xs+nxl,ys:ys+nyl] += filein['mx_xy'] - mx_xz[xs:xs+nxl,zs:zs+nzl] += filein['mx_xz'] - mx_yz[ys:ys+nyl,zs:zs+nzl] += filein['mx_yz'] - my_xy[xs:xs+nxl,ys:ys+nyl] += filein['my_xy'] - my_xz[xs:xs+nxl,zs:zs+nzl] += filein['my_xz'] - my_yz[ys:ys+nyl,zs:zs+nzl] += filein['my_yz'] - mz_xy[xs:xs+nxl,ys:ys+nyl] += filein['mz_xy'] - mz_xz[xs:xs+nxl,zs:zs+nzl] += filein['mz_xz'] - mz_yz[ys:ys+nyl,zs:zs+nzl] += filein['mz_yz'] - E_xy[xs:xs+nxl,ys:ys+nyl] += filein['E_xy'] - E_xz[xs:xs+nxl,zs:zs+nzl] += filein['E_xz'] - E_yz[ys:ys+nyl,zs:zs+nzl] += filein['E_yz'] - if DE: - GE_xy[xs:xs+nxl,ys:ys+nyl] += filein['GE_xy'] - GE_xz[xs:xs+nxl,zs:zs+nzl] += filein['GE_xz'] - GE_yz[ys:ys+nyl,zs:zs+nzl] += filein['GE_yz'] - if SCALAR: - scalar_xy[xs:xs+nxl,ys:ys+nyl] += filein['scalar_xy'] - scalar_xz[xs:xs+nxl,zs:zs+nzl] += filein['scalar_xz'] - scalar_yz[ys:ys+nyl,zs:zs+nzl] += filein['scalar_yz'] - - filein.close() - - # wrte out the new datasets - fileout.create_dataset('d_xy', data=d_xy) - fileout.create_dataset('d_xz', data=d_xz) - fileout.create_dataset('d_yz', data=d_yz) - fileout.create_dataset('mx_xy', data=mx_xy) - fileout.create_dataset('mx_xz', data=mx_xz) - fileout.create_dataset('mx_yz', data=mx_yz) - fileout.create_dataset('my_xy', data=my_xy) - fileout.create_dataset('my_xz', data=my_xz) - fileout.create_dataset('my_yz', data=my_yz) - fileout.create_dataset('mz_xy', data=mz_xy) - fileout.create_dataset('mz_xz', data=mz_xz) - fileout.create_dataset('mz_yz', data=mz_yz) - fileout.create_dataset('E_xy', data=E_xy) - fileout.create_dataset('E_xz', data=E_xz) - fileout.create_dataset('E_yz', data=E_yz) - if DE: - fileout.create_dataset('GE_xy', data=GE_xy) - fileout.create_dataset('GE_xz', data=GE_xz) - fileout.create_dataset('GE_yz', data=GE_yz) - if SCALAR: - fileout.create_dataset('scalar_xy', data=scalar_xy) - fileout.create_dataset('scalar_xz', data=scalar_xz) - fileout.create_dataset('scalar_yz', data=scalar_yz) - - fileout.close() +from cat_dset_3D import copy_header + +# ============================================================================== +def main(): + """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the + command line. If you're importing this file then use the `concat_slice` function directly. + """ + # Argument handling + cli = argparse.ArgumentParser() + # Required Arguments + cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the source HDF5 files.') + cli.add_argument('-o', '--output-file', type=pathlib.Path, required=True, help='The path and filename of the concatenated file.') + cli.add_argument('-n', '--num-processes', type=int, required=True, help='The number of processes that were used to generate the slices.') + cli.add_argument('-t', '--output-num', type=int, required=True, help='The output number to be concatenated') + # Optional Arguments + cli.add_argument('--xy', type=bool, default=True, help='If True then concatenate the XY slice. Defaults to True.') + cli.add_argument('--yz', type=bool, default=True, help='If True then concatenate the YZ slice. Defaults to True.') + cli.add_argument('--xz', type=bool, default=True, help='If True then concatenate the XZ slice. Defaults to True.') + cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') + cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') + cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') + cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') + args = cli.parse_args() + + # Perform the concatenation + concat_slice(source_directory=args.source_directory, + destination_file_path=args.output_file, + num_ranks=args.num_processses, + output_number=args.output_num, + concat_xy=args.xy, + concat_yz=args.yz, + concat_xz=args.xz, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) +# ============================================================================== + +# ============================================================================== +def concat_slice(source_directory: pathlib.Path, + destination_file_path: pathlib.Path, + num_ranks: int, + output_number: int, + concat_xy: bool = True, + concat_yz: bool = True, + concat_xz: bool = True, + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None): + """Concatenate slice HDF5 Cholla datasets. i.e. take the single files + generated per process and concatenate them into a single, large file. This + function concatenates a single output time and can be called multiple times, + potentially in parallel, to concatenate multiple output times. + + Args: + source_directory (pathlib.Path): The directory containing the unconcatenated files + destination_file_path (pathlib.Path): The path and name of the new concatenated file + num_ranks (int): The number of ranks that Cholla was run with + output_number (int): The output number to concatenate + concat_xy (bool, optional): If True then concatenate the XY slice. Defaults to True. + concat_yz (bool, optional): If True then concatenate the YZ slice. Defaults to True. + concat_xz (bool, optional): If True then concatenate the XZ slice. Defaults to True. + skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. + destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. + compression_options (str, optional): What compression settings to use if compressing. Defaults to None. + """ + # Open destination file and first file for getting metadata + source_file = h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') + destination_file = h5py.File(destination_file_path, 'w') + + # Copy over header + destination_file = copy_header(source_file, destination_file) + + # Get a list of all datasets in the source file + datasets_to_copy = list(source_file.keys()) + + # Filter the datasets to only include those I wish to copy + if not concat_xy: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xy' in dataset] + if not concat_yz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'yz' in dataset] + if not concat_xz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xz' in dataset] + datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] + + # Create the datasets in the destination file + for dataset in datasets_to_copy: + dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype + + slice_shape = get_slice_shape(source_file, dataset) + + destination_file.create_dataset(name=dataset, + shape=slice_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) + + # Close source file in prep for looping through source files + source_file.close() + + # Copy data + for rank in range(num_ranks): + # Open source file + source_file = h5py.File(source_directory / f'{output_number}_slice.h5.{rank}', 'r') + + # Loop through and copy datasets + for dataset in datasets_to_copy: + # Determine locations and shifts for writing + (i0_start, i0_end, i1_start, i1_end), file_in_slice = write_bounds(source_file, dataset) + + if file_in_slice: + # Copy the data + destination_file[dataset][i0_start:i0_end, i1_start:i1_end] = source_file[dataset] + + # Now that the copy is done we close the source file + source_file.close() + + # Close destination file now that it is fully constructed + destination_file.close() +# ============================================================================== + +# ============================================================================== +def get_slice_shape(source_file: h5py.File, dataset: str): + """Determine the shape of the full slice in a dataset + + Args: + source_file (h5py.File): The source file the get the shape information from + dataset (str): The dataset to get the shape of + + Raises: + ValueError: If the dataset name isn't a slice name + + Returns: + tuple: The 2D dimensions of the slice + """ + nx, ny, nz = source_file.attrs['dims'] + + if 'xy' in dataset: + slice_dimensions = (nx, ny) + elif 'yz' in dataset: + slice_dimensions = (ny, nz) + elif 'xz' in dataset: + slice_dimensions = (nx, nz) + else: + raise ValueError(f'Dataset "{dataset}" is not a slice.') + + return slice_dimensions +# ============================================================================== + +# ============================================================================== +def write_bounds(source_file: h5py.File, dataset: str): + """Determine the bounds of the concatenated file to write to + + Args: + source_file (h5py.File): The source file to read from + dataset (str): The name of the dataset to read from the source file + + Raises: + ValueError: If the dataset name isn't a slice name + + Returns: + tuple: The write bounds for the concatenated file to be used like `output_file[dataset][return[0]:return[1], return[2]:return[3]] + """ + nx, ny, nz = source_file.attrs['dims'] + nx_local, ny_local, nz_local = source_file.attrs['dims_local'] + x_start, y_start, z_start = source_file.attrs['offset'] + + if 'xy' in dataset: + file_in_slice = z_start <= nz//2 <= z_start+nz_local + bounds = (x_start, x_start+nx_local, y_start, y_start+ny_local) + elif 'yz' in dataset: + file_in_slice = x_start <= nx//2 <= x_start+nx_local + bounds = (y_start, y_start+ny_local, z_start, z_start+nz_local) + elif 'xz' in dataset: + file_in_slice = y_start <= ny//2 <= y_start+ny_local + bounds = (x_start, x_start+nx_local, z_start, z_start+nz_local) + else: + raise ValueError(f'Dataset "{dataset}" is not a slice.') + + return bounds, file_in_slice +# ============================================================================== + +if __name__ == '__main__': + from timeit import default_timer + start = default_timer() + main() + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From ecc13aebb069986429c0c2f7a447b55ff427e220 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sun, 1 Oct 2023 13:32:26 -0400 Subject: [PATCH 03/16] Add python templates for using Dask One template for using Dask on a single machine and one for use on a distributed system, specifically OLCF systems Andes, Crusher, and Frontier. --- python_scripts/dask_distributed_template.py | 132 ++++++++++++++++++ .../dask_single_machine_template.py | 47 +++++++ 2 files changed, 179 insertions(+) create mode 100755 python_scripts/dask_distributed_template.py create mode 100755 python_scripts/dask_single_machine_template.py diff --git a/python_scripts/dask_distributed_template.py b/python_scripts/dask_distributed_template.py new file mode 100755 index 000000000..ac40294b2 --- /dev/null +++ b/python_scripts/dask_distributed_template.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +This is the skeleton for how to run a Dask script on Andes at the OLCF. The CLI +commands required are in the docstring at the top, major Dask steps are in +functions, and `main` is mostly empty with a clear area on where to do your +computations. + +Requirements: - Verified working with Dask v2023.6.0 - Install graphviz for +python + - 'conda install -c conda-forge python-graphviz graphviz' + - Make sure your version of msgpack-python is at least v1.0.5; v1.0.3 had a bug + - `conda install -c conda-forge msgpack-python=1.0.5` + +Notes: +- This is entirely focused on getting Dask to run on Andes, Crusher, and + Frontier. Other systems will likely need similar steps but not identical +- Between each python script the Dask scheduler and workers need to be + restarted. +- "--interface ib0" does not seem to be required but likely does improve + transfer speeds. On Crusher it throws an error, just omit it +- It likes to spit out lots of ugly messages on shutdown that look like + something failed. Odds are that it worked fine and just didn't shutdown + gracefully +- On OLCF systems Dask seems to hang on setup if you use more than 256 + processes. I haven't dug too deeply into it but for now it might be better to + limit jobs to that size and run them longer or run multiple jobs, potentially + an array job +- On OLCF systems it doesn't always end the job properly and the job will just + keep running and do nothing. Either set short walltimes so it times out or + just keep an eye on it. Maybe end with the script sending an exit command + +################################################################################ +#!/usr/bin/env bash + +#SBATCH -A +#SBATCH -J +#SBATCH -o /%x-%j.out +#SBATCH -t 04:00:00 +#SBATCH -p batch +#SBATCH -N 32 +#SBATCH --mail-user= #SBATCH --mail-type=ALL + +# Setup some parameters DASK_SCHEDULE_FILE=$(pwd)/dask_schedule_file.json +DASK_NUM_WORKERS=$((SLURM_JOB_NUM_NODES*8)) + +# Add any scripts that you're importing to the PYTHONPATH, even ones in the same +# directory. The worker tasks have their own directories and won't find any of +# your scripts unless they're in the PYTHONPATH +export PYTHONPATH="${PYTHONPATH}:/your/path/here" + +INTERFACE='--interface ib0' # For Andes +# INTERFACE='' # For Crusher + +srun --exclusive --ntasks=1 dask scheduler $INTERFACE --scheduler-file $DASK_SCHEDULE_FILE --no-dashboard --no-show & + +# Wait for the dask-scheduler to start +sleep 30 + +srun --exclusive --ntasks=$DASK_NUM_WORKERS dask worker --scheduler-file $DASK_SCHEDULE_FILE --memory-limit='auto' --worker-class distributed.Worker $INTERFACE --no-dashboard --local-directory & + +# Wait for workers to start +sleep 10 + +python -u ./dask-distributed-template.py --scheduler-file $DASK_SCHEDULE_FILE --num-workers $DASK_NUM_WORKERS + +wait +################################################################################ +""" + +import dask +import dask.array as da +import dask.dataframe as dd +from dask.distributed import Client +from dask import graph_manipulation + +import pathlib +import argparse + +# ============================================================================== +def main(): + # Get command line arguments + cli = argparse.ArgumentParser() + # Required Arguments + cli.add_argument('-N', '--num-workers', type=int, required=True, help='The number of workers to use') + cli.add_argument('-s', '--scheduler-file', type=pathlib.Path, required=True, help='The path to the scheduler file') + # Optional Arguments + # none yet, feel free to add your own + args = cli.parse_args() + + # Setup the Dask cluster + client = startup_dask(args.scheduler_file, args.num_workers) + + # Perform your computation + # ... + # ... + # ... + # Some suggestions: + # - If you're using Delayed then append all tasks to a list and execute them with `dask.compute(*command_list)` + # - Visualize task tree with `dask.visualize(*command_list, filename=str('filename.pdf')) + # - Add dependencies manually with `dask.graph_manipulation.bind(dependent_task, list_of_dependencies)` + # End of Computation + + # Shutdown the Dask cluster + shutdown_dask(client) +# ============================================================================== + +# ============================================================================== +def startup_dask(scheduler_file, num_workers): + # Connect to the dask-cluster + client = Client(scheduler_file=scheduler_file) + print('client information ', client) + + # Block until num_workers are ready + print(f'Waiting for {num_workers} workers...') + client.wait_for_workers(n_workers=num_workers) + + num_connected_workers = len(client.scheduler_info()['workers']) + print(f'{num_connected_workers} workers connected') + + return client +# ============================================================================== + +# ============================================================================== +def shutdown_dask(client): + print('Shutting down the cluster') + workers_list = list(client.scheduler_info()['workers']) + client.retire_workers(workers_list, close_workers=True) + client.shutdown() +# ============================================================================== + +if __name__ == '__main__': + main() diff --git a/python_scripts/dask_single_machine_template.py b/python_scripts/dask_single_machine_template.py new file mode 100755 index 000000000..7816ec791 --- /dev/null +++ b/python_scripts/dask_single_machine_template.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +""" +================================================================================ + Written by Robert Caddy. + + A simple template for Dask scripts running on a single machine +================================================================================ +""" + +import dask +import dask.array as da +import dask.dataframe as dd +from dask import graph_manipulation + +import argparse +import pathlib + +# ============================================================================== +def main(): + cli = argparse.ArgumentParser() + # Required Arguments + # Optional Arguments + cli.add_argument('-n', '--num-workers', type=int, default=8, help='The number of workers to use.') + args = cli.parse_args() + + # Set scheduler type. Options are 'threads', 'processes', 'single-threaded', and 'distributed'. + # - 'threads' uses threads that share memory, often fastest on single machines, can run into issuse with the GIL + # - 'processes' uses multiple processes that do not share memory, can be used to get around issues with the GIL + # - `single-threaded` is great for debugging + dask.config.set(scheduler='processes', num_workers=args.num_workers) + + # Perform your computation + # ... + # ... + # ... + # Some suggestions: + # - If you're using Delayed then append all tasks to a list and execute them with `dask.compute(*command_list)` + # - Visualize task tree with `dask.visualize(*command_list, filename=str('filename.pdf')) + # - Add dependencies manually with `dask.graph_manipulation.bind(dependent_task, list_of_dependencies)` + # End of Computation +# ============================================================================== + +if __name__ == '__main__': + from timeit import default_timer + start = default_timer() + main() + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From 6120ad3a6df5cc2fdcc23137fddddba7b306e374 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 30 Oct 2023 15:19:23 -0400 Subject: [PATCH 04/16] Refactor slice & dset_3d scripts with common structure The two scripts now have nearly identical CLI and structure --- python_scripts/cat_dset_3D.py | 244 ++++++++++++++++++++-------------- python_scripts/cat_slice.py | 140 +++++++++---------- 2 files changed, 205 insertions(+), 179 deletions(-) mode change 100644 => 100755 python_scripts/cat_slice.py diff --git a/python_scripts/cat_dset_3D.py b/python_scripts/cat_dset_3D.py index 7c403933e..959d692ae 100755 --- a/python_scripts/cat_dset_3D.py +++ b/python_scripts/cat_dset_3D.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ Python script for concatenating 3D hdf5 datasets. Includes a CLI for concatenating Cholla HDF5 datasets and can be -imported into other scripts where the `concat_3d` function can be used to concatenate the datasets. +imported into other scripts where the `concat_3d_field` function can be used to concatenate the datasets. Generally the easiest way to import this script is to add the `python_scripts` directory to your python path in your script like this: @@ -18,85 +18,10 @@ import pathlib # ====================================================================================================================== -def main(): - """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the - command line. If you're importing this file then use the `concat_3d` or `concat_3d_single` functions directly. - """ - # Argument handling - cli = argparse.ArgumentParser() - # Required Arguments - cli.add_argument('-s', '--start_num', type=int, required=True, help='The first output step to concatenate') - cli.add_argument('-e', '--end_num', type=int, required=True, help='The last output step to concatenate') - cli.add_argument('-n', '--num_processes', type=int, required=True, help='The number of processes that were used') - # Optional Arguments - cli.add_argument('-i', '--input_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The input directory.') - cli.add_argument('-o', '--output_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The output directory.') - cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') - cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') - cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') - cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') - args = cli.parse_args() - - # Perform the concatenation - concat_3d(start_num=args.start_num, - end_num=args.end_num, - num_processes=args.num_processes, - input_dir=args.input_dir, - output_dir=args.output_dir, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts) -# ====================================================================================================================== - -# ====================================================================================================================== -def concat_3d(start_num: int, - end_num: int, - num_processes: int, - input_dir: pathlib.Path = pathlib.Path.cwd(), - output_dir: pathlib.Path = pathlib.Path.cwd(), - skip_fields: list = [], - destination_dtype: np.dtype = None, - compression_type: str = None, - compression_options: str = None): - """Concatenate 3D HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a - single, large file. All outputs from start_num to end_num will be concatenated. - - Args: - start_num (int): The first output step to concatenate - end_num (int): The last output step to concatenate - num_processes (int): The number of processes that were used - input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). - output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). - skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. - destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. - compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. - compression_options (str, optional): What compression settings to use if compressing. Defaults to None. - """ - - # Error checking - assert start_num >= 0, 'start_num must be greater than or equal to 0' - assert end_num >= 0, 'end_num must be greater than or equal to 0' - assert start_num <= end_num, 'end_num should be greater than or equal to start_num' - assert num_processes > 1, 'num_processes must be greater than 1' - - # loop over outputs - for n in range(start_num, end_num+1): - concat_3d_single(output_number=n, - num_processes=num_processes, - input_dir=input_dir, - output_dir=output_dir, - skip_fields=skip_fields, - destination_dtype=destination_dtype, - compression_type=compression_type, - compression_options=compression_options) -# ====================================================================================================================== - -# ====================================================================================================================== -def concat_3d_single(output_number: int, +def concat_3d_output(source_directory: pathlib.Path, + output_directory: pathlib.Path, num_processes: int, - input_dir: pathlib.Path = pathlib.Path.cwd(), - output_dir: pathlib.Path = pathlib.Path.cwd(), + output_number: int, skip_fields: list = [], destination_dtype: np.dtype = None, compression_type: str = None, @@ -105,11 +30,10 @@ def concat_3d_single(output_number: int, single, large file. Args: - output_number (int): The output - end_num (int): The last output step to concatenate - num_processes (int): The number of processes that were used - input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). - output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). + source_directory (pathlib.Path): The directory containing the unconcatenated files + output_directory (pathlib.Path): The directory containing the new concatenated files + num_processes (int): The number of ranks that Cholla was run with + output_number (int): The output number to concatenate skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. @@ -120,13 +44,13 @@ def concat_3d_single(output_number: int, assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' - # open the output file for writing (don't overwrite if exists) - fileout = h5py.File(output_dir / f'{output_number}.h5', 'a') + # open the output file for writing (fail if it exists) + destination_file = h5py.File(output_directory / f'{output_number}.h5', 'w-') # Setup the output file - with h5py.File(input_dir / f'{output_number}.h5.0', 'r') as source_file: + with h5py.File(source_directory / f'{output_number}.h5.0', 'r') as source_file: # Copy header data - fileout = copy_header(source_file, fileout) + destination_file = copy_header(source_file, destination_file) # Create the datasets in the output file datasets_to_copy = list(source_file.keys()) @@ -137,29 +61,42 @@ def concat_3d_single(output_number: int, data_shape = source_file.attrs['dims'] - fileout.create_dataset(name=dataset, - shape=data_shape, - dtype=dtype, - compression=compression_type, - compression_opts=compression_options) + if dataset == 'magnetic_x': data_shape[0] += 1 + if dataset == 'magnetic_y': data_shape[1] += 1 + if dataset == 'magnetic_z': data_shape[2] += 1 + + destination_file.create_dataset(name=dataset, + shape=data_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) # loop over files for a given output for i in range(0, num_processes): # open the input file for reading - filein = h5py.File(input_dir / f'{output_number}.h5.{i}', 'r') - # read in the header data from the input file - head = filein.attrs + source_file = h5py.File(source_directory / f'{output_number}.h5.{i}', 'r') - # write data from individual processor file to correct location in concatenated file - nx_local, ny_local, nz_local = filein.attrs['dims_local'] - x_start, y_start, z_start = filein.attrs['offset'] + # Compute the offset slicing + nx_local, ny_local, nz_local = source_file.attrs['dims_local'] + x_start, y_start, z_start = source_file.attrs['offset'] + x_end, y_end, z_end = x_start+nx_local, y_start+ny_local, z_start+nz_local + # write data from individual processor file to correct location in concatenated file for dataset in datasets_to_copy: - fileout[dataset][x_start:x_start+nx_local, y_start:y_start+ny_local,z_start:z_start+nz_local] = filein[dataset] + magnetic_offset = [0,0,0] + if dataset == 'magnetic_x': magnetic_offset[0] = 1 + if dataset == 'magnetic_y': magnetic_offset[1] = 1 + if dataset == 'magnetic_z': magnetic_offset[2] = 1 + + destination_file[dataset][x_start:x_end+magnetic_offset[0], + y_start:y_end+magnetic_offset[1], + z_start:z_end+magnetic_offset[2]] = source_file[dataset] - filein.close() + # Now that the copy is done we close the source file + source_file.close() - fileout.close() + # Close destination file now that it is fully constructed + destination_file.close() # ====================================================================================================================== # ============================================================================== @@ -182,5 +119,106 @@ def copy_header(source_file: h5py.File, destination_file: h5py.File): return destination_file # ============================================================================== +# ============================================================================== +def common_cli() -> argparse.ArgumentParser: + """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an + `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method + is used. + """ + + # ============================================================================ + # Function used to parse the `--concat-output` argument + def concat_output(raw_argument: str) -> list: + # Check if the string is empty + if len(raw_argument) < 1: + raise ValueError('The --concat-output argument must not be of length zero.') + + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + + # Check that it only has the allowed characters + allowed_charaters = set('0123456789,-') + if not set(cleaned_argument).issubset(allowed_charaters): + raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") + + # Split on commas + cleaned_argument = cleaned_argument.split(',') + + # Generate the final list + iterable_argument = set() + for arg in cleaned_argument: + if '-' not in arg: + if int(arg) < 0: + raise ValueError() + iterable_argument.add(int(arg)) + else: + start, end = arg.split('-') + start, end = int(start), int(end) + if end < start: + raise ValueError('The end of a range must be larger than the start of the range.') + if start < 0: + raise ValueError() + iterable_argument = iterable_argument.union(set(range(start, end+1))) + + return iterable_argument + # ============================================================================ + + # ============================================================================ + def positive_int(raw_argument: str) -> int: + arg = int(raw_argument) + if arg < 0: + raise ValueError('Argument must be 0 or greater.') + + return arg + # ============================================================================ + + # ============================================================================ + def skip_fields(raw_argument: str) -> list: + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + cleaned_argument = cleaned_argument.split(',') + + return cleaned_argument + # ============================================================================ + + # Initialize the CLI + cli = argparse.ArgumentParser() + + # Required Arguments + cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the directory for the source HDF5 files.') + cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') + cli.add_argument('-n', '--num-processes', type=positive_int, required=True, help='The number of processes that were used') + cli.add_argument('-c', '--concat-outputs', type=concat_output, required=True, help='Which outputs to concatenate. Can be a single number (e.g. 8), a range (e.g. 2-9), or a list (e.g. [1,2,3]). Ranges are inclusive') + + # Optional Arguments + cli.add_argument('--skip-fields', type=skip_fields, default=[], help='List of fields to skip concatenating. Defaults to empty.') + cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') + cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') + cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') + + return cli +# ============================================================================== + if __name__ == '__main__': - main() + from timeit import default_timer + start = default_timer() + + cli = common_cli() + args = cli.parse_args() + + # Perform the concatenation + for output in args.concat_outputs: + concat_3d_output(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) + + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') diff --git a/python_scripts/cat_slice.py b/python_scripts/cat_slice.py old mode 100644 new mode 100755 index 51aae2d6d..88f66ea2f --- a/python_scripts/cat_slice.py +++ b/python_scripts/cat_slice.py @@ -19,48 +19,12 @@ import pathlib import numpy as np -from cat_dset_3D import copy_header - -# ============================================================================== -def main(): - """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the - command line. If you're importing this file then use the `concat_slice` function directly. - """ - # Argument handling - cli = argparse.ArgumentParser() - # Required Arguments - cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the source HDF5 files.') - cli.add_argument('-o', '--output-file', type=pathlib.Path, required=True, help='The path and filename of the concatenated file.') - cli.add_argument('-n', '--num-processes', type=int, required=True, help='The number of processes that were used to generate the slices.') - cli.add_argument('-t', '--output-num', type=int, required=True, help='The output number to be concatenated') - # Optional Arguments - cli.add_argument('--xy', type=bool, default=True, help='If True then concatenate the XY slice. Defaults to True.') - cli.add_argument('--yz', type=bool, default=True, help='If True then concatenate the YZ slice. Defaults to True.') - cli.add_argument('--xz', type=bool, default=True, help='If True then concatenate the XZ slice. Defaults to True.') - cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') - cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') - cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') - cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') - args = cli.parse_args() - - # Perform the concatenation - concat_slice(source_directory=args.source_directory, - destination_file_path=args.output_file, - num_ranks=args.num_processses, - output_number=args.output_num, - concat_xy=args.xy, - concat_yz=args.yz, - concat_xz=args.xz, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts) -# ============================================================================== +from cat_dset_3D import copy_header, common_cli # ============================================================================== def concat_slice(source_directory: pathlib.Path, - destination_file_path: pathlib.Path, - num_ranks: int, + output_directory: pathlib.Path, + num_processes: int, output_number: int, concat_xy: bool = True, concat_yz: bool = True, @@ -76,8 +40,8 @@ def concat_slice(source_directory: pathlib.Path, Args: source_directory (pathlib.Path): The directory containing the unconcatenated files - destination_file_path (pathlib.Path): The path and name of the new concatenated file - num_ranks (int): The number of ranks that Cholla was run with + output_directory (pathlib.Path): The directory containing the new concatenated files + num_processes (int): The number of ranks that Cholla was run with output_number (int): The output number to concatenate concat_xy (bool, optional): If True then concatenate the XY slice. Defaults to True. concat_yz (bool, optional): If True then concatenate the YZ slice. Defaults to True. @@ -87,53 +51,57 @@ def concat_slice(source_directory: pathlib.Path, compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. compression_options (str, optional): What compression settings to use if compressing. Defaults to None. """ - # Open destination file and first file for getting metadata - source_file = h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') - destination_file = h5py.File(destination_file_path, 'w') - - # Copy over header - destination_file = copy_header(source_file, destination_file) - # Get a list of all datasets in the source file - datasets_to_copy = list(source_file.keys()) + # Error checking + assert num_processes > 1, 'num_processes must be greater than 1' + assert output_number >= 0, 'output_number must be greater than or equal to 0' - # Filter the datasets to only include those I wish to copy - if not concat_xy: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xy' in dataset] - if not concat_yz: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'yz' in dataset] - if not concat_xz: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xz' in dataset] - datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] - - # Create the datasets in the destination file - for dataset in datasets_to_copy: - dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype - - slice_shape = get_slice_shape(source_file, dataset) + # Open destination file and first file for getting metadata + destination_file = h5py.File(output_directory / f'{output_number}_slice.h5', 'w-') + + # Setup the output file + with h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') as source_file: + # Copy over header + destination_file = copy_header(source_file, destination_file) + + # Get a list of all datasets in the source file + datasets_to_copy = list(source_file.keys()) + + # Filter the datasets to only include those that need to be copied + if not concat_xy: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xy' in dataset] + if not concat_yz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'yz' in dataset] + if not concat_xz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xz' in dataset] + datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] + + # Create the datasets in the destination file + for dataset in datasets_to_copy: + dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype - destination_file.create_dataset(name=dataset, - shape=slice_shape, - dtype=dtype, - compression=compression_type, - compression_opts=compression_options) + slice_shape = __get_slice_shape(source_file, dataset) - # Close source file in prep for looping through source files - source_file.close() + destination_file.create_dataset(name=dataset, + shape=slice_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) # Copy data - for rank in range(num_ranks): + for rank in range(num_processes): # Open source file source_file = h5py.File(source_directory / f'{output_number}_slice.h5.{rank}', 'r') # Loop through and copy datasets for dataset in datasets_to_copy: # Determine locations and shifts for writing - (i0_start, i0_end, i1_start, i1_end), file_in_slice = write_bounds(source_file, dataset) + (i0_start, i0_end, i1_start, i1_end), file_in_slice = __write_bounds_slice(source_file, dataset) if file_in_slice: # Copy the data - destination_file[dataset][i0_start:i0_end, i1_start:i1_end] = source_file[dataset] + destination_file[dataset][i0_start:i0_end, + i1_start:i1_end] = source_file[dataset] # Now that the copy is done we close the source file source_file.close() @@ -143,7 +111,7 @@ def concat_slice(source_directory: pathlib.Path, # ============================================================================== # ============================================================================== -def get_slice_shape(source_file: h5py.File, dataset: str): +def __get_slice_shape(source_file: h5py.File, dataset: str): """Determine the shape of the full slice in a dataset Args: @@ -171,7 +139,7 @@ def get_slice_shape(source_file: h5py.File, dataset: str): # ============================================================================== # ============================================================================== -def write_bounds(source_file: h5py.File, dataset: str): +def __write_bounds_slice(source_file: h5py.File, dataset: str): """Determine the bounds of the concatenated file to write to Args: @@ -206,5 +174,25 @@ def write_bounds(source_file: h5py.File, dataset: str): if __name__ == '__main__': from timeit import default_timer start = default_timer() - main() + + cli = common_cli() + cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY slice.') + cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ slice.') + cli.add_argument('--disable-xz', default=True, action='store_false', help='Disables concating the XZ slice.') + args = cli.parse_args() + + # Perform the concatenation + for output in args.concat_outputs: + concat_slice(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + concat_xy=args.disable_xy, + concat_yz=args.disable_yz, + concat_xz=args.disable_xz, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From ebd9ef0988cdca6d9a8e4d66f8f528108a07f82c Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Tue, 31 Oct 2023 11:17:14 -0400 Subject: [PATCH 05/16] Add chunking option to concatenation scripts --- python_scripts/cat_dset_3D.py | 25 +++++++++++++++++++++++-- python_scripts/cat_slice.py | 8 ++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/python_scripts/cat_dset_3D.py b/python_scripts/cat_dset_3D.py index 959d692ae..9e6cff693 100755 --- a/python_scripts/cat_dset_3D.py +++ b/python_scripts/cat_dset_3D.py @@ -25,7 +25,8 @@ def concat_3d_output(source_directory: pathlib.Path, skip_fields: list = [], destination_dtype: np.dtype = None, compression_type: str = None, - compression_options: str = None): + compression_options: str = None, + chunking = None): """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a single, large file. @@ -38,6 +39,7 @@ def concat_3d_output(source_directory: pathlib.Path, destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. compression_options (str, optional): What compression settings to use if compressing. Defaults to None. + chunking (bool or tuple, optional): Whether or not to use chunking and the chunk size. Defaults to None. """ # Error checking @@ -68,6 +70,7 @@ def concat_3d_output(source_directory: pathlib.Path, destination_file.create_dataset(name=dataset, shape=data_shape, dtype=dtype, + chunks=chunking, compression=compression_type, compression_opts=compression_options) @@ -185,6 +188,22 @@ def skip_fields(raw_argument: str) -> list: return cleaned_argument # ============================================================================ + # ============================================================================ + def chunk_arg(raw_argument: str): + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('(', '') + cleaned_argument = cleaned_argument.replace(')', '') + + # Check that it only has the allowed characters + allowed_charaters = set('0123456789,') + if not set(cleaned_argument).issubset(allowed_charaters): + raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") + + # Convert to a tuple and return + return tuple([int(i) for i in cleaned_argument.split(',')]) + # ============================================================================ + # Initialize the CLI cli = argparse.ArgumentParser() @@ -199,6 +218,7 @@ def skip_fields(raw_argument: str) -> list: cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') + cli.add_argument('--chunking', type=chunk_arg, default=None, nargs='?', const=True, help='Enable chunking of the output file. Default is `False`. If set without an argument then the chunk size will be automatically chosen or a tuple can be passed to indicate the chunk size desired.') return cli # ============================================================================== @@ -219,6 +239,7 @@ def skip_fields(raw_argument: str) -> list: skip_fields=args.skip_fields, destination_dtype=args.dtype, compression_type=args.compression_type, - compression_options=args.compression_opts) + compression_options=args.compression_opts, + chunking=args.chunking) print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') diff --git a/python_scripts/cat_slice.py b/python_scripts/cat_slice.py index 88f66ea2f..9f608a96e 100755 --- a/python_scripts/cat_slice.py +++ b/python_scripts/cat_slice.py @@ -32,7 +32,8 @@ def concat_slice(source_directory: pathlib.Path, skip_fields: list = [], destination_dtype: np.dtype = None, compression_type: str = None, - compression_options: str = None): + compression_options: str = None, + chunking = None): """Concatenate slice HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a single, large file. This function concatenates a single output time and can be called multiple times, @@ -50,6 +51,7 @@ def concat_slice(source_directory: pathlib.Path, destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. compression_options (str, optional): What compression settings to use if compressing. Defaults to None. + chunking (bool or tuple, optional): Whether or not to use chunking and the chunk size. Defaults to None. """ # Error checking @@ -85,6 +87,7 @@ def concat_slice(source_directory: pathlib.Path, destination_file.create_dataset(name=dataset, shape=slice_shape, dtype=dtype, + chunks=chunking, compression=compression_type, compression_opts=compression_options) @@ -193,6 +196,7 @@ def __write_bounds_slice(source_file: h5py.File, dataset: str): skip_fields=args.skip_fields, destination_dtype=args.dtype, compression_type=args.compression_type, - compression_options=args.compression_opts) + compression_options=args.compression_opts, + chunking=args.chunking) print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From bb8c39a4c5925a42da944375e10cf5d1272032bc Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sat, 11 Nov 2023 10:47:26 -0500 Subject: [PATCH 06/16] Update cat_slice to work with projection data --- python_scripts/cat_slice.py | 133 ++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/python_scripts/cat_slice.py b/python_scripts/cat_slice.py index 9f608a96e..152b4db7c 100755 --- a/python_scripts/cat_slice.py +++ b/python_scripts/cat_slice.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """ -Python script for concatenating slice hdf5 datasets for when -DSLICES is turned -on in Cholla. Includes a CLI for concatenating Cholla HDF5 datasets and can be -imported into other scripts where the `concat_slice` function can be used to -concatenate the HDF5 files. +Python script for concatenating 2D hdf5 datasets for when -DSLICES, +-DPROJECTION, or -DROTATED_PROJECTION is turned on in Cholla. Includes a CLI for +concatenating Cholla HDF5 datasets and can be imported into other scripts where +the `concat_2d_dataset` function can be used to concatenate the HDF5 files. Generally the easiest way to import this script is to add the `python_scripts` directory to your python path in your script like this: @@ -22,19 +22,20 @@ from cat_dset_3D import copy_header, common_cli # ============================================================================== -def concat_slice(source_directory: pathlib.Path, - output_directory: pathlib.Path, - num_processes: int, - output_number: int, - concat_xy: bool = True, - concat_yz: bool = True, - concat_xz: bool = True, - skip_fields: list = [], - destination_dtype: np.dtype = None, - compression_type: str = None, - compression_options: str = None, - chunking = None): - """Concatenate slice HDF5 Cholla datasets. i.e. take the single files +def concat_2d_dataset(source_directory: pathlib.Path, + output_directory: pathlib.Path, + num_processes: int, + output_number: int, + dataset_kind: str, + concat_xy: bool = True, + concat_yz: bool = True, + concat_xz: bool = True, + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None, + chunking = None): + """Concatenate 2D HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a single, large file. This function concatenates a single output time and can be called multiple times, potentially in parallel, to concatenate multiple output times. @@ -44,9 +45,10 @@ def concat_slice(source_directory: pathlib.Path, output_directory (pathlib.Path): The directory containing the new concatenated files num_processes (int): The number of ranks that Cholla was run with output_number (int): The output number to concatenate - concat_xy (bool, optional): If True then concatenate the XY slice. Defaults to True. - concat_yz (bool, optional): If True then concatenate the YZ slice. Defaults to True. - concat_xz (bool, optional): If True then concatenate the XZ slice. Defaults to True. + dataset_kind (str): The type of 2D dataset to concatenate. Can be 'slice', 'proj', or 'rot_proj'. + concat_xy (bool, optional): If True then concatenate the XY slices/projections. Defaults to True. + concat_yz (bool, optional): If True then concatenate the YZ slices/projections. Defaults to True. + concat_xz (bool, optional): If True then concatenate the XZ slices/projections. Defaults to True. skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. @@ -57,12 +59,13 @@ def concat_slice(source_directory: pathlib.Path, # Error checking assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' + assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' - # Open destination file and first file for getting metadata - destination_file = h5py.File(output_directory / f'{output_number}_slice.h5', 'w-') + # Open destination file + destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w-') - # Setup the output file - with h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') as source_file: + # Setup the destination file + with h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.0', 'r') as source_file: # Copy over header destination_file = copy_header(source_file, destination_file) @@ -79,13 +82,19 @@ def concat_slice(source_directory: pathlib.Path, datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] # Create the datasets in the destination file + zero_array = np.zeros(1) for dataset in datasets_to_copy: dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype - slice_shape = __get_slice_shape(source_file, dataset) + dataset_shape = __get_2d_dataset_shape(source_file, dataset) + + # Create array to initialize data to zero, this is required for projections + if zero_array.shape != dataset_shape: + zero_array = np.zeros(dataset_shape) destination_file.create_dataset(name=dataset, - shape=slice_shape, + shape=dataset_shape, + data=zero_array, dtype=dtype, chunks=chunking, compression=compression_type, @@ -94,17 +103,21 @@ def concat_slice(source_directory: pathlib.Path, # Copy data for rank in range(num_processes): # Open source file - source_file = h5py.File(source_directory / f'{output_number}_slice.h5.{rank}', 'r') + source_file = h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.{rank}', 'r') # Loop through and copy datasets for dataset in datasets_to_copy: # Determine locations and shifts for writing - (i0_start, i0_end, i1_start, i1_end), file_in_slice = __write_bounds_slice(source_file, dataset) + (i0_start, i0_end, i1_start, i1_end), file_in_slice = __write_bounds_2d_dataset(source_file, dataset) + + # If this is a slice dataset we can skip loading the source file if that + # file isn't in the slice + if dataset_kind == 'slice' and not file_in_slice: + continue - if file_in_slice: - # Copy the data - destination_file[dataset][i0_start:i0_end, - i1_start:i1_end] = source_file[dataset] + # Copy the data, the summation is required for projections but not slices + destination_file[dataset][i0_start:i0_end, + i1_start:i1_end] += source_file[dataset] # Now that the copy is done we close the source file source_file.close() @@ -114,35 +127,35 @@ def concat_slice(source_directory: pathlib.Path, # ============================================================================== # ============================================================================== -def __get_slice_shape(source_file: h5py.File, dataset: str): - """Determine the shape of the full slice in a dataset +def __get_2d_dataset_shape(source_file: h5py.File, dataset: str): + """Determine the shape of the full 2D dataset Args: source_file (h5py.File): The source file the get the shape information from dataset (str): The dataset to get the shape of Raises: - ValueError: If the dataset name isn't a slice name + ValueError: If the dataset name isn't a 2D dataset name Returns: - tuple: The 2D dimensions of the slice + tuple: The dimensions of the dataset """ nx, ny, nz = source_file.attrs['dims'] - +#TODO update this rot proj if 'xy' in dataset: - slice_dimensions = (nx, ny) + dimensions = (nx, ny) elif 'yz' in dataset: - slice_dimensions = (ny, nz) + dimensions = (ny, nz) elif 'xz' in dataset: - slice_dimensions = (nx, nz) + dimensions = (nx, nz) else: raise ValueError(f'Dataset "{dataset}" is not a slice.') - return slice_dimensions + return dimensions # ============================================================================== # ============================================================================== -def __write_bounds_slice(source_file: h5py.File, dataset: str): +def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str): """Determine the bounds of the concatenated file to write to Args: @@ -150,7 +163,7 @@ def __write_bounds_slice(source_file: h5py.File, dataset: str): dataset (str): The name of the dataset to read from the source file Raises: - ValueError: If the dataset name isn't a slice name + ValueError: If the dataset name isn't a 2D dataset name Returns: tuple: The write bounds for the concatenated file to be used like `output_file[dataset][return[0]:return[1], return[2]:return[3]] @@ -169,7 +182,7 @@ def __write_bounds_slice(source_file: h5py.File, dataset: str): file_in_slice = y_start <= ny//2 <= y_start+ny_local bounds = (x_start, x_start+nx_local, z_start, z_start+nz_local) else: - raise ValueError(f'Dataset "{dataset}" is not a slice.') + raise ValueError(f'Dataset "{dataset}" is not a slice or projection.') return bounds, file_in_slice # ============================================================================== @@ -179,24 +192,26 @@ def __write_bounds_slice(source_file: h5py.File, dataset: str): start = default_timer() cli = common_cli() - cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY slice.') - cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ slice.') - cli.add_argument('--disable-xz', default=True, action='store_false', help='Disables concating the XZ slice.') + cli.add_argument('-d', '--dataset-kind', type=str, required=True, help='What kind of 2D dataset to concatnate. Options are "slice", "proj", and "rot_proj"') + cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY datasets.') + cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ datasets.') + cli.add_argument('--disable-xz', default=True, action='store_false', help='Disables concating the XZ datasets.') args = cli.parse_args() # Perform the concatenation for output in args.concat_outputs: - concat_slice(source_directory=args.source_directory, - output_directory=args.output_directory, - num_processes=args.num_processes, - output_number=output, - concat_xy=args.disable_xy, - concat_yz=args.disable_yz, - concat_xz=args.disable_xz, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts, - chunking=args.chunking) + concat_2d_dataset(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + dataset_kind=args.dataset_kind, + concat_xy=args.disable_xy, + concat_yz=args.disable_yz, + concat_xz=args.disable_xz, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts, + chunking=args.chunking) print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From 47df926f743f5bf0ba8a221334d0947c44a3b551 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sat, 11 Nov 2023 10:54:22 -0500 Subject: [PATCH 07/16] Rename concat scripts for clarity delete cat_projection.py as it has been superseded by concat_2d_data.py --- python_scripts/cat_projection.py | 67 ------------------- .../{cat_slice.py => concat_2d_data.py} | 6 +- .../{cat_dset_3D.py => concat_3d_data.py} | 2 +- 3 files changed, 4 insertions(+), 71 deletions(-) delete mode 100755 python_scripts/cat_projection.py rename python_scripts/{cat_slice.py => concat_2d_data.py} (98%) rename python_scripts/{cat_dset_3D.py => concat_3d_data.py} (99%) diff --git a/python_scripts/cat_projection.py b/python_scripts/cat_projection.py deleted file mode 100755 index 29b56a416..000000000 --- a/python_scripts/cat_projection.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# Example file for concatenating on-axis projection data -# created when the -DPROJECTION flag is turned on - -import h5py -import numpy as np - -ns = 0 -ne = 0 -n_procs = 16 # number of processors that did the cholla calculation -dnamein = './hdf5/raw/' -dnameout = './hdf5/' - -# loop over the output times -for n in range(ns, ne+1): - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_proj.h5', 'w') - - # loop over files for a given output time - for i in range(0, n_procs): - - # open the input file for reading - filein = h5py.File(dnamein+str(n)+'_proj.h5.'+str(i), 'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['dims'] = [nx, ny, nz] - fileout.attrs['gamma'] = [head['gamma'][0]] - fileout.attrs['t'] = [head['t'][0]] - fileout.attrs['dt'] = [head['dt'][0]] - fileout.attrs['n_step'] = [head['n_step'][0]] - - dxy = np.zeros((nx,ny)) - dxz = np.zeros((nx,nz)) - Txy = np.zeros((nx,ny)) - Txz = np.zeros((nx,nz)) - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - - dxy[xs:xs+nxl,ys:ys+nyl] += filein['d_xy'] - dxz[xs:xs+nxl,zs:zs+nzl] += filein['d_xz'] - Txy[xs:xs+nxl,ys:ys+nyl] += filein['T_xy'] - Txz[xs:xs+nxl,zs:zs+nzl] += filein['T_xz'] - - filein.close() - - # write out the new datasets - fileout.create_dataset('d_xy', data=dxy) - fileout.create_dataset('d_xz', data=dxz) - fileout.create_dataset('T_xy', data=Txy) - fileout.create_dataset('T_xz', data=Txz) - - fileout.close() diff --git a/python_scripts/cat_slice.py b/python_scripts/concat_2d_data.py similarity index 98% rename from python_scripts/cat_slice.py rename to python_scripts/concat_2d_data.py index 152b4db7c..04a67da86 100755 --- a/python_scripts/cat_slice.py +++ b/python_scripts/concat_2d_data.py @@ -10,7 +10,7 @@ ``` import sys sys.path.append('/PATH/TO/CHOLLA/python_scripts') -import cat_slice +import concat_2d_data ``` """ @@ -19,7 +19,7 @@ import pathlib import numpy as np -from cat_dset_3D import copy_header, common_cli +from concat_3d_data import copy_header, common_cli # ============================================================================== def concat_2d_dataset(source_directory: pathlib.Path, @@ -62,7 +62,7 @@ def concat_2d_dataset(source_directory: pathlib.Path, assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' # Open destination file - destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w-') + destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w') # Setup the destination file with h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.0', 'r') as source_file: diff --git a/python_scripts/cat_dset_3D.py b/python_scripts/concat_3d_data.py similarity index 99% rename from python_scripts/cat_dset_3D.py rename to python_scripts/concat_3d_data.py index 9e6cff693..8eaded7df 100755 --- a/python_scripts/cat_dset_3D.py +++ b/python_scripts/concat_3d_data.py @@ -8,7 +8,7 @@ ``` import sys sys.path.append('/PATH/TO/CHOLLA/python_scripts') -import cat_dset_3D +import concat_3d_data ``` """ From ee587e9f08ad71e8773104eb68b31d517eaf78ff Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sat, 11 Nov 2023 21:09:49 -0700 Subject: [PATCH 08/16] Add rotated projection support to concat_2d_data.py Also, removed cat_rotated_projection.py as it is now superseded by the exhanced funcionality of concat_2d_data.py --- python_scripts/cat_rotated_projection.py | 85 ------------------------ python_scripts/concat_2d_data.py | 12 +++- 2 files changed, 10 insertions(+), 87 deletions(-) delete mode 100755 python_scripts/cat_rotated_projection.py diff --git a/python_scripts/cat_rotated_projection.py b/python_scripts/cat_rotated_projection.py deleted file mode 100755 index 6e769ce55..000000000 --- a/python_scripts/cat_rotated_projection.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -# Example file for concatenating rotated projection data -# created when the -DROTATED_PROJECTION flag is turned on - -import h5py -import numpy as np - -ns = 0 -ne = 0 -n_procs = 16 # number of processors that did the cholla calculation -dnamein = './hdf5/raw/' -dnameout = './hdf5/' - -# loop over the output times -for n in range(ns, ne+1): - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_rot_proj.h5', 'w') - - # loop over files for a given output time - for i in range(0, n_procs): - - # open the input file for reading - filein = h5py.File(dnamein+str(n)+'_rot_proj.h5.'+str(i), 'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the arrays to hold the output data - if (i == 0): - nxr = int(head['nxr']) - nzr = int(head['nzr']) - Lx = head['Lx'] - Lz = head['Lz'] - delta = head['delta'] - theta = head['theta'] - phi = head['phi'] - gamma = head['gamma'] - t = head['t'] - dt = head['dt'] - n_step = head['n_step'] - fileout.attrs['nxr'] = nxr - fileout.attrs['nzr'] = nzr - fileout.attrs['Lx'] = Lx - fileout.attrs['Lz'] = Lz - fileout.attrs['delta'] = delta - fileout.attrs['theta'] = theta - fileout.attrs['phi'] = phi - fileout.attrs['gamma'] = gamma - fileout.attrs['t'] = t - fileout.attrs['dt'] = dt - fileout.attrs['n_step'] = n_step - - d_xzr = np.zeros((nxr, nzr)) - vx_xzr = np.zeros((nxr, nzr)) - vy_xzr = np.zeros((nxr, nzr)) - vz_xzr = np.zeros((nxr, nzr)) - T_xzr = np.zeros((nxr, nzr)) - - # write data from individual processor file to - # correct location in concatenated file - nx_min = int(head['nx_min']) - nx_max = int(head['nx_max']) - nz_min = int(head['nz_min']) - nz_max = int(head['nz_max']) - - d_xzr[nx_min:nx_max,nz_min:nz_max] += filein['d_xzr'][:] - vx_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vx_xzr'][:] - vy_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vy_xzr'][:] - vz_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vz_xzr'][:] - T_xzr[nx_min:nx_max,nz_min:nz_max] += filein['T_xzr'][:] - - filein.close() - - # write out the new datasets - fileout.create_dataset("d_xzr", data=d_xzr) - fileout.create_dataset("vx_xzr", data=vx_xzr) - fileout.create_dataset("vy_xzr", data=vy_xzr) - fileout.create_dataset("vz_xzr", data=vz_xzr) - fileout.create_dataset("T_xzr", data=T_xzr) - - fileout.close() - - - diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py index 04a67da86..11e14e554 100755 --- a/python_scripts/concat_2d_data.py +++ b/python_scripts/concat_2d_data.py @@ -62,7 +62,7 @@ def concat_2d_dataset(source_directory: pathlib.Path, assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' # Open destination file - destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w') + destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w-') # Setup the destination file with h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.0', 'r') as source_file: @@ -140,8 +140,11 @@ def __get_2d_dataset_shape(source_file: h5py.File, dataset: str): Returns: tuple: The dimensions of the dataset """ + + if 'xzr' in dataset: + return (source_file.attrs['nxr'][0], source_file.attrs['nzr'][0]) + nx, ny, nz = source_file.attrs['dims'] -#TODO update this rot proj if 'xy' in dataset: dimensions = (nx, ny) elif 'yz' in dataset: @@ -168,6 +171,11 @@ def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str): Returns: tuple: The write bounds for the concatenated file to be used like `output_file[dataset][return[0]:return[1], return[2]:return[3]] """ + + if 'xzr' in dataset: + return (source_file.attrs['nx_min'][0], source_file.attrs['nx_max'][0], + source_file.attrs['nz_min'][0], source_file.attrs['nz_max'][0]), True + nx, ny, nz = source_file.attrs['dims'] nx_local, ny_local, nz_local = source_file.attrs['dims_local'] x_start, y_start, z_start = source_file.attrs['offset'] From 3a8983013d63630113b46ab901de3993b702be55 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sat, 11 Nov 2023 21:25:14 -0700 Subject: [PATCH 09/16] Add safer method of opening destination HDF5 file --- python_scripts/concat_2d_data.py | 4 ++-- python_scripts/concat_3d_data.py | 37 +++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py index 11e14e554..e64c77052 100755 --- a/python_scripts/concat_2d_data.py +++ b/python_scripts/concat_2d_data.py @@ -19,7 +19,7 @@ import pathlib import numpy as np -from concat_3d_data import copy_header, common_cli +from concat_3d_data import copy_header, common_cli, destination_safe_open # ============================================================================== def concat_2d_dataset(source_directory: pathlib.Path, @@ -62,7 +62,7 @@ def concat_2d_dataset(source_directory: pathlib.Path, assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' # Open destination file - destination_file = h5py.File(output_directory / f'{output_number}_{dataset_kind}.h5', 'w-') + destination_file = destination_safe_open(output_directory / f'{output_number}_{dataset_kind}.h5') # Setup the destination file with h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.0', 'r') as source_file: diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py index 8eaded7df..6b9edc0bd 100755 --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -46,8 +46,8 @@ def concat_3d_output(source_directory: pathlib.Path, assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' - # open the output file for writing (fail if it exists) - destination_file = h5py.File(output_directory / f'{output_number}.h5', 'w-') + # Open the output file for writing + destination_file = destination_safe_open(output_directory / f'{output_number}.h5') # Setup the output file with h5py.File(source_directory / f'{output_number}.h5.0', 'r') as source_file: @@ -100,7 +100,38 @@ def concat_3d_output(source_directory: pathlib.Path, # Close destination file now that it is fully constructed destination_file.close() -# ====================================================================================================================== +# ============================================================================== + +# ============================================================================== +def destination_safe_open(filename: pathlib.Path) -> h5py.File: + """Opens a HDF5 file safely and provides useful error messages for some common failure modes + + Parameters + ---------- + filename : pathlib.Path + The full path and name of the file to open + + Returns + ------- + h5py.File + The opened HDF5 file object + + Raises + ------ + FileExistsError + Raises if the destination file already exists + """ + + try: + destination_file = h5py.File(filename, 'w-') + except FileExistsError: + # It might be better for this to simply print the error message and return + # rather than exiting. That way if a single call fails in a parallel + # environment it doesn't take down the entire job + raise FileExistsError(f'File "{filename}" already exists and will not be overwritten, skipping.') + + return destination_file +# ============================================================================== # ============================================================================== def copy_header(source_file: h5py.File, destination_file: h5py.File): From da1c6727be42a9f39000eb7d965e1d15eee585cb Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Sun, 12 Nov 2023 17:10:10 -0700 Subject: [PATCH 10/16] Convert to Numpy docstrings --- python_scripts/concat_2d_data.py | 76 ++++++++++++++++++----- python_scripts/concat_3d_data.py | 103 +++++++++++++++++++++++-------- 2 files changed, 136 insertions(+), 43 deletions(-) mode change 100755 => 100644 python_scripts/concat_2d_data.py mode change 100755 => 100644 python_scripts/concat_3d_data.py diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py old mode 100755 new mode 100644 index e64c77052..16f1668a0 --- a/python_scripts/concat_2d_data.py +++ b/python_scripts/concat_2d_data.py @@ -36,24 +36,66 @@ def concat_2d_dataset(source_directory: pathlib.Path, compression_options: str = None, chunking = None): """Concatenate 2D HDF5 Cholla datasets. i.e. take the single files - generated per process and concatenate them into a single, large file. This - function concatenates a single output time and can be called multiple times, - potentially in parallel, to concatenate multiple output times. + generated per process and concatenate them into a single, large file. This + function concatenates a single output time and can be called multiple times, + potentially in parallel, to concatenate multiple output times. + + Parameters + ---------- + source_directory : pathlib.Path + The directory containing the unconcatenated files + output_directory : pathlib.Path + The directory containing the new concatenated files + num_processes : int + The number of ranks that Cholla was run with + output_number : int + The output number to concatenate + dataset_kind : str + The type of 2D dataset to concatenate. Can be 'slice', 'proj', or 'rot_proj'. + concat_xy : bool + If True then concatenate the XY slices/projections. Defaults to True. + concat_yz : bool + If True then concatenate the YZ slices/projections. Defaults to True. + concat_xz : bool + If True then concatenate the XZ slices/projections. Defaults to True. + skip_fields : list + List of fields to skip concatenating. Defaults to []. + destination_dtype : np.dtype + The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type : str + What kind of compression to use on the output data. Defaults to None. + compression_options : str + What compression settings to use if compressing. Defaults to None. + chunking : bool or tuple + Whether or not to use chunking and the chunk size. Defaults to None. + source_directory: pathlib.Path : + + output_directory: pathlib.Path : + + num_processes: int : + + output_number: int : + + dataset_kind: str : + + concat_xy: bool : + (Default value = True) + concat_yz: bool : + (Default value = True) + concat_xz: bool : + (Default value = True) + skip_fields: list : + (Default value = []) + destination_dtype: np.dtype : + (Default value = None) + compression_type: str : + (Default value = None) + compression_options: str : + (Default value = None) + + Returns + ------- - Args: - source_directory (pathlib.Path): The directory containing the unconcatenated files - output_directory (pathlib.Path): The directory containing the new concatenated files - num_processes (int): The number of ranks that Cholla was run with - output_number (int): The output number to concatenate - dataset_kind (str): The type of 2D dataset to concatenate. Can be 'slice', 'proj', or 'rot_proj'. - concat_xy (bool, optional): If True then concatenate the XY slices/projections. Defaults to True. - concat_yz (bool, optional): If True then concatenate the YZ slices/projections. Defaults to True. - concat_xz (bool, optional): If True then concatenate the XZ slices/projections. Defaults to True. - skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. - destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. - compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. - compression_options (str, optional): What compression settings to use if compressing. Defaults to None. - chunking (bool or tuple, optional): Whether or not to use chunking and the chunk size. Defaults to None. """ # Error checking diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py old mode 100755 new mode 100644 index 6b9edc0bd..08cc1a50b --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -28,18 +28,48 @@ def concat_3d_output(source_directory: pathlib.Path, compression_options: str = None, chunking = None): """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a - single, large file. - - Args: - source_directory (pathlib.Path): The directory containing the unconcatenated files - output_directory (pathlib.Path): The directory containing the new concatenated files - num_processes (int): The number of ranks that Cholla was run with - output_number (int): The output number to concatenate - skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. - destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. - compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. - compression_options (str, optional): What compression settings to use if compressing. Defaults to None. - chunking (bool or tuple, optional): Whether or not to use chunking and the chunk size. Defaults to None. + single, large file. + + Parameters + ---------- + source_directory : pathlib.Path + The directory containing the unconcatenated files + output_directory : pathlib.Path + The directory containing the new concatenated files + num_processes : int + The number of ranks that Cholla was run with + output_number : int + The output number to concatenate + skip_fields : list + List of fields to skip concatenating. Defaults to []. + destination_dtype : np.dtype + The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type : str + What kind of compression to use on the output data. Defaults to None. + compression_options : str + What compression settings to use if compressing. Defaults to None. + chunking : bool or tuple + Whether or not to use chunking and the chunk size. Defaults to None. + source_directory: pathlib.Path : + + output_directory: pathlib.Path : + + num_processes: int : + + output_number: int : + + skip_fields: list : + (Default value = []) + destination_dtype: np.dtype : + (Default value = None) + compression_type: str : + (Default value = None) + compression_options: str : + (Default value = None) + + Returns + ------- + """ # Error checking @@ -109,17 +139,20 @@ def destination_safe_open(filename: pathlib.Path) -> h5py.File: Parameters ---------- filename : pathlib.Path - The full path and name of the file to open + + The full path and name of the file to open : + + filename: pathlib.Path : + Returns ------- h5py.File - The opened HDF5 file object - Raises - ------ - FileExistsError - Raises if the destination file already exists + The opened HDF5 file object + + + """ try: @@ -137,12 +170,22 @@ def destination_safe_open(filename: pathlib.Path) -> h5py.File: def copy_header(source_file: h5py.File, destination_file: h5py.File): """Copy the attributes of one HDF5 file to another, skipping all fields that are specific to an individual rank - Args: - source_file (h5py.File): The source file - destination_file (h5py.File): The destination file + Parameters + ---------- + source_file : h5py.File + The source file + destination_file : h5py.File + The destination file + source_file: h5py.File : + + destination_file: h5py.File : + + + Returns + ------- + h5py.File + The destination file with the new header attributes - Returns: - h5py.File: The destination file with the new header attributes """ fields_to_skip = ['dims_local', 'offset'] @@ -156,13 +199,21 @@ def copy_header(source_file: h5py.File, destination_file: h5py.File): # ============================================================================== def common_cli() -> argparse.ArgumentParser: """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an - `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method - is used. + `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method + is used. + + Parameters + ---------- + + Returns + ------- + """ # ============================================================================ - # Function used to parse the `--concat-output` argument def concat_output(raw_argument: str) -> list: + """Function used to parse the `--concat-output` argument + """ # Check if the string is empty if len(raw_argument) < 1: raise ValueError('The --concat-output argument must not be of length zero.') From 3fbd10855b14be60713b323ae95ab9db141b5b49 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 13 Nov 2023 11:50:35 -0700 Subject: [PATCH 11/16] Move all concat common tools into their own file --- python_scripts/concat_2d_data.py | 19 +-- python_scripts/concat_3d_data.py | 184 +---------------------------- python_scripts/concat_internals.py | 178 ++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 187 deletions(-) create mode 100644 python_scripts/concat_internals.py diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py index 16f1668a0..5cf6fde55 100644 --- a/python_scripts/concat_2d_data.py +++ b/python_scripts/concat_2d_data.py @@ -15,11 +15,10 @@ """ import h5py -import argparse import pathlib import numpy as np -from concat_3d_data import copy_header, common_cli, destination_safe_open +import concat_internals # ============================================================================== def concat_2d_dataset(source_directory: pathlib.Path, @@ -34,7 +33,7 @@ def concat_2d_dataset(source_directory: pathlib.Path, destination_dtype: np.dtype = None, compression_type: str = None, compression_options: str = None, - chunking = None): + chunking = None) -> None: """Concatenate 2D HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a single, large file. This function concatenates a single output time and can be called multiple times, @@ -104,12 +103,12 @@ def concat_2d_dataset(source_directory: pathlib.Path, assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' # Open destination file - destination_file = destination_safe_open(output_directory / f'{output_number}_{dataset_kind}.h5') + destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}_{dataset_kind}.h5') # Setup the destination file with h5py.File(source_directory / f'{output_number}_{dataset_kind}.h5.0', 'r') as source_file: # Copy over header - destination_file = copy_header(source_file, destination_file) + destination_file = concat_internals.copy_header(source_file, destination_file) # Get a list of all datasets in the source file datasets_to_copy = list(source_file.keys()) @@ -169,7 +168,7 @@ def concat_2d_dataset(source_directory: pathlib.Path, # ============================================================================== # ============================================================================== -def __get_2d_dataset_shape(source_file: h5py.File, dataset: str): +def __get_2d_dataset_shape(source_file: h5py.File, dataset: str) -> tuple: """Determine the shape of the full 2D dataset Args: @@ -200,7 +199,7 @@ def __get_2d_dataset_shape(source_file: h5py.File, dataset: str): # ============================================================================== # ============================================================================== -def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str): +def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str) -> tuple: """Determine the bounds of the concatenated file to write to Args: @@ -211,7 +210,9 @@ def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str): ValueError: If the dataset name isn't a 2D dataset name Returns: - tuple: The write bounds for the concatenated file to be used like `output_file[dataset][return[0]:return[1], return[2]:return[3]] + tuple: The write bounds for the concatenated file to be used like + `output_file[dataset][return[0]:return[1], return[2]:return[3]]` followed by a bool to indicate if the file is + in the slice if concatenating a slice """ if 'xzr' in dataset: @@ -241,7 +242,7 @@ def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str): from timeit import default_timer start = default_timer() - cli = common_cli() + cli = concat_internals.common_cli() cli.add_argument('-d', '--dataset-kind', type=str, required=True, help='What kind of 2D dataset to concatnate. Options are "slice", "proj", and "rot_proj"') cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY datasets.') cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ datasets.') diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py index 08cc1a50b..930c108e2 100644 --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -14,9 +14,10 @@ import h5py import numpy as np -import argparse import pathlib +import concat_internals + # ====================================================================================================================== def concat_3d_output(source_directory: pathlib.Path, output_directory: pathlib.Path, @@ -26,7 +27,7 @@ def concat_3d_output(source_directory: pathlib.Path, destination_dtype: np.dtype = None, compression_type: str = None, compression_options: str = None, - chunking = None): + chunking = None) -> None: """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a single, large file. @@ -77,12 +78,12 @@ def concat_3d_output(source_directory: pathlib.Path, assert output_number >= 0, 'output_number must be greater than or equal to 0' # Open the output file for writing - destination_file = destination_safe_open(output_directory / f'{output_number}.h5') + destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}.h5') # Setup the output file with h5py.File(source_directory / f'{output_number}.h5.0', 'r') as source_file: # Copy header data - destination_file = copy_header(source_file, destination_file) + destination_file = concat_internals.copy_header(source_file, destination_file) # Create the datasets in the output file datasets_to_copy = list(source_file.keys()) @@ -132,184 +133,11 @@ def concat_3d_output(source_directory: pathlib.Path, destination_file.close() # ============================================================================== -# ============================================================================== -def destination_safe_open(filename: pathlib.Path) -> h5py.File: - """Opens a HDF5 file safely and provides useful error messages for some common failure modes - - Parameters - ---------- - filename : pathlib.Path - - The full path and name of the file to open : - - filename: pathlib.Path : - - - Returns - ------- - h5py.File - - The opened HDF5 file object - - - - """ - - try: - destination_file = h5py.File(filename, 'w-') - except FileExistsError: - # It might be better for this to simply print the error message and return - # rather than exiting. That way if a single call fails in a parallel - # environment it doesn't take down the entire job - raise FileExistsError(f'File "{filename}" already exists and will not be overwritten, skipping.') - - return destination_file -# ============================================================================== - -# ============================================================================== -def copy_header(source_file: h5py.File, destination_file: h5py.File): - """Copy the attributes of one HDF5 file to another, skipping all fields that are specific to an individual rank - - Parameters - ---------- - source_file : h5py.File - The source file - destination_file : h5py.File - The destination file - source_file: h5py.File : - - destination_file: h5py.File : - - - Returns - ------- - h5py.File - The destination file with the new header attributes - - """ - fields_to_skip = ['dims_local', 'offset'] - - for attr_key in source_file.attrs.keys(): - if attr_key not in fields_to_skip: - destination_file.attrs[attr_key] = source_file.attrs[attr_key] - - return destination_file -# ============================================================================== - -# ============================================================================== -def common_cli() -> argparse.ArgumentParser: - """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an - `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method - is used. - - Parameters - ---------- - - Returns - ------- - - """ - - # ============================================================================ - def concat_output(raw_argument: str) -> list: - """Function used to parse the `--concat-output` argument - """ - # Check if the string is empty - if len(raw_argument) < 1: - raise ValueError('The --concat-output argument must not be of length zero.') - - # Strip unneeded characters - cleaned_argument = raw_argument.replace(' ', '') - cleaned_argument = cleaned_argument.replace('[', '') - cleaned_argument = cleaned_argument.replace(']', '') - - # Check that it only has the allowed characters - allowed_charaters = set('0123456789,-') - if not set(cleaned_argument).issubset(allowed_charaters): - raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") - - # Split on commas - cleaned_argument = cleaned_argument.split(',') - - # Generate the final list - iterable_argument = set() - for arg in cleaned_argument: - if '-' not in arg: - if int(arg) < 0: - raise ValueError() - iterable_argument.add(int(arg)) - else: - start, end = arg.split('-') - start, end = int(start), int(end) - if end < start: - raise ValueError('The end of a range must be larger than the start of the range.') - if start < 0: - raise ValueError() - iterable_argument = iterable_argument.union(set(range(start, end+1))) - - return iterable_argument - # ============================================================================ - - # ============================================================================ - def positive_int(raw_argument: str) -> int: - arg = int(raw_argument) - if arg < 0: - raise ValueError('Argument must be 0 or greater.') - - return arg - # ============================================================================ - - # ============================================================================ - def skip_fields(raw_argument: str) -> list: - # Strip unneeded characters - cleaned_argument = raw_argument.replace(' ', '') - cleaned_argument = cleaned_argument.replace('[', '') - cleaned_argument = cleaned_argument.replace(']', '') - cleaned_argument = cleaned_argument.split(',') - - return cleaned_argument - # ============================================================================ - - # ============================================================================ - def chunk_arg(raw_argument: str): - # Strip unneeded characters - cleaned_argument = raw_argument.replace(' ', '') - cleaned_argument = cleaned_argument.replace('(', '') - cleaned_argument = cleaned_argument.replace(')', '') - - # Check that it only has the allowed characters - allowed_charaters = set('0123456789,') - if not set(cleaned_argument).issubset(allowed_charaters): - raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") - - # Convert to a tuple and return - return tuple([int(i) for i in cleaned_argument.split(',')]) - # ============================================================================ - - # Initialize the CLI - cli = argparse.ArgumentParser() - - # Required Arguments - cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the directory for the source HDF5 files.') - cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') - cli.add_argument('-n', '--num-processes', type=positive_int, required=True, help='The number of processes that were used') - cli.add_argument('-c', '--concat-outputs', type=concat_output, required=True, help='Which outputs to concatenate. Can be a single number (e.g. 8), a range (e.g. 2-9), or a list (e.g. [1,2,3]). Ranges are inclusive') - - # Optional Arguments - cli.add_argument('--skip-fields', type=skip_fields, default=[], help='List of fields to skip concatenating. Defaults to empty.') - cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') - cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') - cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') - cli.add_argument('--chunking', type=chunk_arg, default=None, nargs='?', const=True, help='Enable chunking of the output file. Default is `False`. If set without an argument then the chunk size will be automatically chosen or a tuple can be passed to indicate the chunk size desired.') - - return cli -# ============================================================================== - if __name__ == '__main__': from timeit import default_timer start = default_timer() - cli = common_cli() + cli = concat_internals.common_cli() args = cli.parse_args() # Perform the concatenation diff --git a/python_scripts/concat_internals.py b/python_scripts/concat_internals.py new file mode 100644 index 000000000..29bf49829 --- /dev/null +++ b/python_scripts/concat_internals.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +Contains all the common tools for the various concatnation functions/scipts +""" + +import h5py +import argparse +import pathlib + +# ============================================================================== +def destination_safe_open(filename: pathlib.Path) -> h5py.File: + """Opens a HDF5 file safely and provides useful error messages for some common failure modes + + Parameters + ---------- + filename : pathlib.Path + + The full path and name of the file to open : + + filename: pathlib.Path : + + + Returns + ------- + h5py.File + + The opened HDF5 file object + """ + + try: + destination_file = h5py.File(filename, 'w-') + except FileExistsError: + # It might be better for this to simply print the error message and return + # rather than exiting. That way if a single call fails in a parallel + # environment it doesn't take down the entire job + raise FileExistsError(f'File "{filename}" already exists and will not be overwritten, skipping.') + + return destination_file +# ============================================================================== + +# ============================================================================== +def copy_header(source_file: h5py.File, destination_file: h5py.File) -> h5py.File: + """Copy the attributes of one HDF5 file to another, skipping all fields that are specific to an individual rank + + Parameters + ---------- + source_file : h5py.File + The source file + destination_file : h5py.File + The destination file + source_file: h5py.File : + + destination_file: h5py.File : + + + Returns + ------- + h5py.File + The destination file with the new header attributes + """ + fields_to_skip = ['dims_local', 'offset'] + + for attr_key in source_file.attrs.keys(): + if attr_key not in fields_to_skip: + destination_file.attrs[attr_key] = source_file.attrs[attr_key] + + return destination_file +# ============================================================================== + +# ============================================================================== +def common_cli() -> argparse.ArgumentParser: + """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an + `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method + is used. + + Parameters + ---------- + + Returns + ------- + argparse.ArgumentParser + The common components of the CLI for the concatenation scripts + """ + + # ============================================================================ + def concat_output(raw_argument: str) -> list: + """Function used to parse the `--concat-output` argument + """ + # Check if the string is empty + if len(raw_argument) < 1: + raise ValueError('The --concat-output argument must not be of length zero.') + + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + + # Check that it only has the allowed characters + allowed_charaters = set('0123456789,-') + if not set(cleaned_argument).issubset(allowed_charaters): + raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") + + # Split on commas + cleaned_argument = cleaned_argument.split(',') + + # Generate the final list + iterable_argument = set() + for arg in cleaned_argument: + if '-' not in arg: + if int(arg) < 0: + raise ValueError() + iterable_argument.add(int(arg)) + else: + start, end = arg.split('-') + start, end = int(start), int(end) + if end < start: + raise ValueError('The end of a range must be larger than the start of the range.') + if start < 0: + raise ValueError() + iterable_argument = iterable_argument.union(set(range(start, end+1))) + + return iterable_argument + # ============================================================================ + + # ============================================================================ + def positive_int(raw_argument: str) -> int: + arg = int(raw_argument) + if arg < 0: + raise ValueError('Argument must be 0 or greater.') + + return arg + # ============================================================================ + + # ============================================================================ + def skip_fields(raw_argument: str) -> list: + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + cleaned_argument = cleaned_argument.split(',') + + return cleaned_argument + # ============================================================================ + + # ============================================================================ + def chunk_arg(raw_argument: str) -> tuple: + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('(', '') + cleaned_argument = cleaned_argument.replace(')', '') + + # Check that it only has the allowed characters + allowed_charaters = set('0123456789,') + if not set(cleaned_argument).issubset(allowed_charaters): + raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") + + # Convert to a tuple and return + return tuple([int(i) for i in cleaned_argument.split(',')]) + # ============================================================================ + + # Initialize the CLI + cli = argparse.ArgumentParser() + + # Required Arguments + cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the directory for the source HDF5 files.') + cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') + cli.add_argument('-n', '--num-processes', type=positive_int, required=True, help='The number of processes that were used') + cli.add_argument('-c', '--concat-outputs', type=concat_output, required=True, help='Which outputs to concatenate. Can be a single number (e.g. 8), a range (e.g. 2-9), or a list (e.g. [1,2,3]). Ranges are inclusive') + + # Optional Arguments + cli.add_argument('--skip-fields', type=skip_fields, default=[], help='List of fields to skip concatenating. Defaults to empty.') + cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') + cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') + cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') + cli.add_argument('--chunking', type=chunk_arg, default=None, nargs='?', const=True, help='Enable chunking of the output file. Default is `False`. If set without an argument then the chunk size will be automatically chosen or a tuple can be passed to indicate the chunk size desired.') + + return cli +# ============================================================================== From 572ac721c12a2c5a0406eb49cfc0d83acbb96b34 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 13 Nov 2023 11:53:11 -0700 Subject: [PATCH 12/16] Remove deprecated cat.py All the functionality of cat.py is now available in concat_2d_data.py and concat_3d_data.py. Marked concatenation files as executable --- python_scripts/cat.py | 406 ----------------------------- python_scripts/cat_particles.py | 0 python_scripts/concat_2d_data.py | 0 python_scripts/concat_3d_data.py | 0 python_scripts/concat_internals.py | 0 5 files changed, 406 deletions(-) delete mode 100755 python_scripts/cat.py mode change 100644 => 100755 python_scripts/cat_particles.py mode change 100644 => 100755 python_scripts/concat_2d_data.py mode change 100644 => 100755 python_scripts/concat_3d_data.py mode change 100644 => 100755 python_scripts/concat_internals.py diff --git a/python_scripts/cat.py b/python_scripts/cat.py deleted file mode 100755 index dc840c570..000000000 --- a/python_scripts/cat.py +++ /dev/null @@ -1,406 +0,0 @@ -# Utils for concat cholla output - -import h5py -import numpy as np -import os - -verbose = True - -def parse(argv): - # Determine prefix - if 'h5' in argv: - preprefix = argv.split('.h5')[0] - prefix = preprefix +'.h5' - - else: - prefix = './{}.h5'.format(argv) - - # Check existing - firstfile = prefix+'.0' - if not os.path.isfile(firstfile): - print(firstfile,' is missing') - exit() - - # Set dirnames - dnamein = os.path.dirname(firstfile)+'/' - dnameout = os.path.dirname(firstfile) + '/' - return dnamein,dnameout - -def hydro(n,dnamein,dnameout,double=True): - """ - n: integer, output number of file - dnamein: string, directory name of input files, should include '/' at end or leave blank for current directory - dnameout: string, directory name of output files, should include '/' at end or leave blank for current directory - double: optional bool, double precision (float64) if True, single precision (float32) if False - - Reads files of form dnamein{n}.h5.{rank}, looping over rank, outputting to file dnameout{n}.h5. - """ - - fileout = h5py.File(dnameout+str(n)+'.h5', 'a') - - i = -1 - # loops over all files - while True: - i += 1 - - fileinname = dnamein+str(n)+'.h5.'+str(i) - - if not os.path.isfile(fileinname): - break - print('Load:',fileinname,flush=True) - - # open the input file for reading - filein = h5py.File(fileinname,'r') - - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - fileout.attrs['dims'] = [nx, ny, nz] - fileout.attrs['gamma'] = [head['gamma'][0]] - fileout.attrs['t'] = [head['t'][0]] - fileout.attrs['dt'] = [head['dt'][0]] - fileout.attrs['n_step'] = [head['n_step'][0]] - - units = ['time_unit', 'mass_unit', 'length_unit', 'energy_unit', 'velocity_unit', 'densit\ -y_unit'] - for unit in units: - fileout.attrs[unit] = [head[unit][0]] - keys = list(filein.keys()) - #['density','momentum_x','momentum_y','momentum_z','Energy','GasEnergy','scalar0'] - - for key in keys: - if key not in fileout: - # WARNING: If you don't set dataset dtype it will default to 32-bit, but CHOLLA likes to be 64-bit - if double: - dtype = filein[key].dtype - else: - dtype = None - if nz > 1: - fileout.create_dataset(key, (nx, ny, nz), chunks=(nxl,nyl,nzl), dtype=dtype) - elif ny > 1: - fileout.create_dataset(key, (nx, ny), chunks=(nxl,nyl), dtype=dtype) - elif nx > 1: - fileout.create_dataset(key, (nx,), chunks=(nxl,), dtype=dtype) - #fileout.create_dataset(key, (nx, ny, nz)) - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - for key in keys: - if key in filein: - if nz > 1: - fileout[key][xs:xs+nxl,ys:ys+nyl,zs:zs+nzl] = filein[key] - elif ny > 1: - fileout[key][xs:xs+nxl,ys:ys+nyl] = filein[key] - elif nx > 1: - fileout[key][xs:xs+nxl] = filein[key] - filein.close() - - # end loop over all files - fileout.close() - - -def projection(n,dnamein,dnameout): - """ - n: integer, output number of file - dnamein: string, directory name of input files, should include '/' at end or leave blank for current directory - dnameout: string, directory name of output files, should include '/' at end or leave blank for current directory - double: optional bool, double precision (float64) if True, single precision (float32) if False - - Reads files of form dnamein{n}.h5.{rank}, looping over rank, outputting to file dnameout{n}.h5. - """ - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_proj.h5', 'w') - i = -1 - while True: - i += 1 - - fileinname = dnamein+str(n)+'_proj.h5.'+str(i) - - if not os.path.isfile(fileinname): - break - - if verbose: - print(fileinname) - # open the input file for reading - filein = h5py.File(fileinname,'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['dims'] = [nx, ny, nz] - fileout.attrs['gamma'] = [head['gamma'][0]] - fileout.attrs['t'] = [head['t'][0]] - fileout.attrs['dt'] = [head['dt'][0]] - fileout.attrs['n_step'] = [head['n_step'][0]] - - dxy = np.zeros((nx,ny)) - dxz = np.zeros((nx,nz)) - Txy = np.zeros((nx,ny)) - Txz = np.zeros((nx,nz)) - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - - dxy[xs:xs+nxl,ys:ys+nyl] += filein['d_xy'] - dxz[xs:xs+nxl,zs:zs+nzl] += filein['d_xz'] - Txy[xs:xs+nxl,ys:ys+nyl] += filein['T_xy'] - Txz[xs:xs+nxl,zs:zs+nzl] += filein['T_xz'] - - filein.close() - - # write out the new datasets - fileout.create_dataset('d_xy', data=dxy) - fileout.create_dataset('d_xz', data=dxz) - fileout.create_dataset('T_xy', data=Txy) - fileout.create_dataset('T_xz', data=Txz) - - fileout.close() - return - -def slice(n,dnamein,dnameout): - """ - n: integer, output number of file - dnamein: string, directory name of input files, should include '/' at end or leave blank for current directory - dnameout: string, directory name of output files, should include '/' at end or leave blank for current directory - double: optional bool, double precision (float64) if True, single precision (float32) if False - - Reads files of form dnamein{n}_slice.h5.{rank}, looping over rank, outputting to file dnameout{n}_slice.h5. - """ - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_slice.h5', 'w') - - i = -1 - while True: - # loop over files for a given output time - i += 1 - - fileinname = dnamein+str(n)+'_slice.h5.'+str(i) - if not os.path.isfile(fileinname): - break - - if verbose: - print(fileinname) - # open the input file for reading - filein = h5py.File(fileinname,'r') - # read in the header data from the input file - head = filein.attrs - - # Detect DE - DE = 'GE_xy' in filein - SCALAR = 'scalar_xy' in filein - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - gamma = head['gamma'] - t = head['t'] - dt = head['dt'] - n_step = head['n_step'] - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['gamma'] = gamma - fileout.attrs['t'] = t - fileout.attrs['dt'] = dt - fileout.attrs['n_step'] = n_step - fileout.attrs['dims'] = [nx, ny, nz] - - d_xy = np.zeros((nx,ny)) - d_xz = np.zeros((nx,nz)) - d_yz = np.zeros((ny,nz)) - mx_xy = np.zeros((nx,ny)) - mx_xz = np.zeros((nx,nz)) - mx_yz = np.zeros((ny,nz)) - my_xy = np.zeros((nx,ny)) - my_xz = np.zeros((nx,nz)) - my_yz = np.zeros((ny,nz)) - mz_xy = np.zeros((nx,ny)) - mz_xz = np.zeros((nx,nz)) - mz_yz = np.zeros((ny,nz)) - E_xy = np.zeros((nx,ny)) - E_xz = np.zeros((nx,nz)) - E_yz = np.zeros((ny,nz)) - if DE: - GE_xy = np.zeros((nx,ny)) - GE_xz = np.zeros((nx,nz)) - GE_yz = np.zeros((ny,nz)) - if SCALAR: - scalar_xy = np.zeros((nx,ny)) - scalar_xz = np.zeros((nx,nz)) - scalar_yz = np.zeros((ny,nz)) - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - - d_xy[xs:xs+nxl,ys:ys+nyl] += filein['d_xy'] - d_xz[xs:xs+nxl,zs:zs+nzl] += filein['d_xz'] - d_yz[ys:ys+nyl,zs:zs+nzl] += filein['d_yz'] - mx_xy[xs:xs+nxl,ys:ys+nyl] += filein['mx_xy'] - mx_xz[xs:xs+nxl,zs:zs+nzl] += filein['mx_xz'] - mx_yz[ys:ys+nyl,zs:zs+nzl] += filein['mx_yz'] - my_xy[xs:xs+nxl,ys:ys+nyl] += filein['my_xy'] - my_xz[xs:xs+nxl,zs:zs+nzl] += filein['my_xz'] - my_yz[ys:ys+nyl,zs:zs+nzl] += filein['my_yz'] - mz_xy[xs:xs+nxl,ys:ys+nyl] += filein['mz_xy'] - mz_xz[xs:xs+nxl,zs:zs+nzl] += filein['mz_xz'] - mz_yz[ys:ys+nyl,zs:zs+nzl] += filein['mz_yz'] - E_xy[xs:xs+nxl,ys:ys+nyl] += filein['E_xy'] - E_xz[xs:xs+nxl,zs:zs+nzl] += filein['E_xz'] - E_yz[ys:ys+nyl,zs:zs+nzl] += filein['E_yz'] - if DE: - GE_xy[xs:xs+nxl,ys:ys+nyl] += filein['GE_xy'] - GE_xz[xs:xs+nxl,zs:zs+nzl] += filein['GE_xz'] - GE_yz[ys:ys+nyl,zs:zs+nzl] += filein['GE_yz'] - if SCALAR: - scalar_xy[xs:xs+nxl,ys:ys+nyl] += filein['scalar_xy'] - scalar_xz[xs:xs+nxl,zs:zs+nzl] += filein['scalar_xz'] - scalar_yz[ys:ys+nyl,zs:zs+nzl] += filein['scalar_yz'] - - filein.close() - - # wrte out the new datasets - fileout.create_dataset('d_xy', data=d_xy) - fileout.create_dataset('d_xz', data=d_xz) - fileout.create_dataset('d_yz', data=d_yz) - fileout.create_dataset('mx_xy', data=mx_xy) - fileout.create_dataset('mx_xz', data=mx_xz) - fileout.create_dataset('mx_yz', data=mx_yz) - fileout.create_dataset('my_xy', data=my_xy) - fileout.create_dataset('my_xz', data=my_xz) - fileout.create_dataset('my_yz', data=my_yz) - fileout.create_dataset('mz_xy', data=mz_xy) - fileout.create_dataset('mz_xz', data=mz_xz) - fileout.create_dataset('mz_yz', data=mz_yz) - fileout.create_dataset('E_xy', data=E_xy) - fileout.create_dataset('E_xz', data=E_xz) - fileout.create_dataset('E_yz', data=E_yz) - if DE: - fileout.create_dataset('GE_xy', data=GE_xy) - fileout.create_dataset('GE_xz', data=GE_xz) - fileout.create_dataset('GE_yz', data=GE_yz) - if SCALAR: - fileout.create_dataset('scalar_xy', data=scalar_xy) - fileout.create_dataset('scalar_xz', data=scalar_xz) - fileout.create_dataset('scalar_yz', data=scalar_yz) - - fileout.close() - return - -def rot_proj(n,dnamein,dnameout): - """ - n: integer, output number of file - dnamein: string, directory name of input files, should include '/' at end or leave blank for current directory - dnameout: string, directory name of output files, should include '/' at end or leave blank for current directory - double: optional bool, double precision (float64) if True, single precision (float32) if False - - Reads files of form dnamein{n}_rot_proj.h5.{rank}, looping over rank, outputting to file dnameout{n}_rot_proj.h5. - """ - - fileout = h5py.File(dnameout+str(n)+'_rot_proj.h5', 'w') - i = -1 - - while True: - # loop over files for a given output time - i += 1 - fileinname = dnamein+str(n)+'_rot_proj.h5.'+str(i) - if not os.path.isfile(fileinname): - break - - if verbose: - print(fileinname) - - filein = h5py.File(dnamein+fileinname,'r') - head = filein.attrs - # if it's the first input file, write the header attributes - # and create the arrays to hold the output data - if (i == 0): - - nxr = int(head['nxr']) - nzr = int(head['nzr']) - Lx = head['Lx'] - Lz = head['Lz'] - delta = head['delta'] - theta = head['theta'] - phi = head['phi'] - gamma = head['gamma'] - t = head['t'] - dt = head['dt'] - n_step = head['n_step'] - fileout.attrs['nxr'] = nxr - fileout.attrs['nzr'] = nzr - fileout.attrs['Lx'] = Lx - fileout.attrs['Lz'] = Lz - fileout.attrs['delta'] = delta - fileout.attrs['theta'] = theta - fileout.attrs['phi'] = phi - fileout.attrs['gamma'] = gamma - fileout.attrs['t'] = t - fileout.attrs['dt'] = dt - fileout.attrs['n_step'] = n_step - - d_xzr = np.zeros((nxr, nzr)) - vx_xzr = np.zeros((nxr, nzr)) - vy_xzr = np.zeros((nxr, nzr)) - vz_xzr = np.zeros((nxr, nzr)) - T_xzr = np.zeros((nxr, nzr)) - - # end first input file - - # write data from individual processor file to - # correct location in concatenated file - nx_min = int(head['nx_min']) - nx_max = int(head['nx_max']) - nz_min = int(head['nz_min']) - nz_max = int(head['nz_max']) - - d_xzr[nx_min:nx_max,nz_min:nz_max] += filein['d_xzr'][:] - vx_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vx_xzr'][:] - vy_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vy_xzr'][:] - vz_xzr[nx_min:nx_max,nz_min:nz_max] += filein['vz_xzr'][:] - T_xzr[nx_min:nx_max,nz_min:nz_max] += filein['T_xzr'][:] - - filein.close() - # end while loop - - # write out the new datasets - fileout.create_dataset("d_xzr", data=d_xzr) - fileout.create_dataset("vx_xzr", data=vx_xzr) - fileout.create_dataset("vy_xzr", data=vy_xzr) - fileout.create_dataset("vz_xzr", data=vz_xzr) - fileout.create_dataset("T_xzr", data=T_xzr) - - fileout.close() diff --git a/python_scripts/cat_particles.py b/python_scripts/cat_particles.py old mode 100644 new mode 100755 diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py old mode 100644 new mode 100755 diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py old mode 100644 new mode 100755 diff --git a/python_scripts/concat_internals.py b/python_scripts/concat_internals.py old mode 100644 new mode 100755 From 8152145e82619e65755cce8abcf0cedf97afb0c1 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 13 Nov 2023 11:59:50 -0700 Subject: [PATCH 13/16] Rename concat_3d_output to concat_3d_dataset for consistency with 2d version --- python_scripts/concat_3d_data.py | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py index 930c108e2..73194ebf0 100755 --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -19,15 +19,15 @@ import concat_internals # ====================================================================================================================== -def concat_3d_output(source_directory: pathlib.Path, - output_directory: pathlib.Path, - num_processes: int, - output_number: int, - skip_fields: list = [], - destination_dtype: np.dtype = None, - compression_type: str = None, - compression_options: str = None, - chunking = None) -> None: +def concat_3d_dataset(source_directory: pathlib.Path, + output_directory: pathlib.Path, + num_processes: int, + output_number: int, + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None, + chunking = None) -> None: """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a single, large file. @@ -142,14 +142,14 @@ def concat_3d_output(source_directory: pathlib.Path, # Perform the concatenation for output in args.concat_outputs: - concat_3d_output(source_directory=args.source_directory, - output_directory=args.output_directory, - num_processes=args.num_processes, - output_number=output, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts, - chunking=args.chunking) + concat_3d_dataset(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts, + chunking=args.chunking) print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From 611eb163bd6c020752c0fc5344460cc0a21e3d69 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 13 Nov 2023 13:39:55 -0700 Subject: [PATCH 14/16] Update particles concatenation file to new method --- python_scripts/cat_particles.py | 91 ----------- python_scripts/concat_3d_data.py | 8 +- python_scripts/concat_internals.py | 2 +- python_scripts/concat_particles.py | 250 +++++++++++++++++++++++++++++ 4 files changed, 255 insertions(+), 96 deletions(-) delete mode 100755 python_scripts/cat_particles.py create mode 100755 python_scripts/concat_particles.py diff --git a/python_scripts/cat_particles.py b/python_scripts/cat_particles.py deleted file mode 100755 index 03cbcd71c..000000000 --- a/python_scripts/cat_particles.py +++ /dev/null @@ -1,91 +0,0 @@ -# Example file for concatenating particle data - -import h5py -import numpy as np - -ns = 0 -ne = 300 -n_procs = 4 # number of processors that did the cholla calculation -dnamein = '/gpfs/alpine/proj-shared/csc380/orlandow/o_cholla/out.21Sep20-Mon-12.49-356588-SOR_ONLY_PARTICLES_DISK/raw/' -dnameout = '/gpfs/alpine/proj-shared/csc380/orlandow/o_cholla/out.21Sep20-Mon-12.49-356588-SOR_ONLY_PARTICLES_DISK/particles_cat/' - -# loop over the output times -for n in range(ns, ne+1): - - # open the output file for writing - fileout = h5py.File(dnameout+str(n)+'_particles.h5', 'w') - - if (n % 10 == 0): print(str(n)) - - # loop over files for a given output time - for i in range(0, n_procs): - - # open the input file for reading - filein = h5py.File(dnamein+str(n)+'_particles.h5.'+str(i), 'r') - # read in the header data from the input file - head = filein.attrs - - # if it's the first input file, write the header attributes - # and create the datasets in the output file - if (i == 0): - gamma = head['gamma'] - t = head['t'] - dt = head['dt'] - n_step = head['n_step'] - nx = head['dims'][0] - ny = head['dims'][1] - nz = head['dims'][2] - fileout.attrs['gamma'] = gamma - fileout.attrs['t'] = t - fileout.attrs['dt'] = dt - fileout.attrs['n_step'] = n_step - fileout.attrs['dims'] = [nx, ny, nz] - fileout.attrs['velocity_unit'] = head['velocity_unit'] - fileout.attrs['length_unit'] = head['length_unit'] - fileout.attrs['particle_mass'] = head['particle_mass'] - fileout.attrs['density_unit'] = head['density_unit'] - - x = np.array([]) - y = np.array([]) - z = np.array([]) - vx = np.array([]) - vy = np.array([]) - vz = np.array([]) - particle_ids = np.array([]) - density = np.zeros((nx, ny, nz)) - n_total_particles = 0 - - - # write data from individual processor file to - # correct location in concatenated file - nxl = head['dims_local'][0] - nyl = head['dims_local'][1] - nzl = head['dims_local'][2] - xs = head['offset'][0] - ys = head['offset'][1] - zs = head['offset'][2] - - n_total_particles += head['n_particles_local'] - density[xs:xs+nxl, ys:ys+nyl, zs:zs+nzl] += filein['density'] - x = np.append(x, filein['pos_x']) - y = np.append(y, filein['pos_y']) - z = np.append(z, filein['pos_z']) - vx = np.append(vx, filein['vel_x']) - vy = np.append(vy, filein['vel_y']) - vz = np.append(vz, filein['vel_z']) - particle_ids = np.append(particle_ids, filein['particle_IDs']) - - filein.close() - - # write out the new datasets - fileout.create_dataset('x', data=x) - fileout.create_dataset('y', data=y) - fileout.create_dataset('z', data=z) - fileout.create_dataset('vx', data=vx) - fileout.create_dataset('vy', data=vy) - fileout.create_dataset('vz', data=vz) - fileout.create_dataset('particle_ids', data=particle_ids) - fileout.create_dataset('density', data=density) - fileout.attrs['n_total_particles'] = n_total_particles - - fileout.close() diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py index 73194ebf0..599a4a8d1 100755 --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ Python script for concatenating 3D hdf5 datasets. Includes a CLI for concatenating Cholla HDF5 datasets and can be -imported into other scripts where the `concat_3d_field` function can be used to concatenate the datasets. +imported into other scripts where the `concat_3d_dataset` function can be used to concatenate the datasets. Generally the easiest way to import this script is to add the `python_scripts` directory to your python path in your script like this: @@ -18,7 +18,7 @@ import concat_internals -# ====================================================================================================================== +# ============================================================================== def concat_3d_dataset(source_directory: pathlib.Path, output_directory: pathlib.Path, num_processes: int, @@ -28,8 +28,8 @@ def concat_3d_dataset(source_directory: pathlib.Path, compression_type: str = None, compression_options: str = None, chunking = None) -> None: - """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files generated per process and concatenate them into a - single, large file. + """Concatenate a single 3D HDF5 Cholla dataset. i.e. take the single files + generated per process and concatenate them into a single, large file. Parameters ---------- diff --git a/python_scripts/concat_internals.py b/python_scripts/concat_internals.py index 29bf49829..6f90f0211 100755 --- a/python_scripts/concat_internals.py +++ b/python_scripts/concat_internals.py @@ -58,7 +58,7 @@ def copy_header(source_file: h5py.File, destination_file: h5py.File) -> h5py.Fil h5py.File The destination file with the new header attributes """ - fields_to_skip = ['dims_local', 'offset'] + fields_to_skip = ['dims_local', 'offset', 'n_particles_local'] for attr_key in source_file.attrs.keys(): if attr_key not in fields_to_skip: diff --git a/python_scripts/concat_particles.py b/python_scripts/concat_particles.py new file mode 100755 index 000000000..d286a4fec --- /dev/null +++ b/python_scripts/concat_particles.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +Python script for concatenating particle hdf5 datasets. Includes a CLI for concatenating Cholla HDF5 datasets and can be +imported into other scripts where the `concat_particles_dataset` function can be used to concatenate the datasets. + +Generally the easiest way to import this script is to add the `python_scripts` directory to your python path in your +script like this: +``` +import sys +sys.path.append('/PATH/TO/CHOLLA/python_scripts') +import concat_particles +``` +""" + +import h5py +import numpy as np +import pathlib + +import concat_internals + +# ====================================================================================================================== +def concat_particles_dataset(source_directory: pathlib.Path, + output_directory: pathlib.Path, + num_processes: int, + output_number: int, + skip_fields: list = [], + destination_dtype: np.dtype = None, + compression_type: str = None, + compression_options: str = None, + chunking = None) -> None: + """Concatenate a single particle HDF5 Cholla dataset. i.e. take the single + files generated per process and concatenate them into a single, large file. + + Parameters + ---------- + source_directory : pathlib.Path + The directory containing the unconcatenated files + output_directory : pathlib.Path + The directory containing the new concatenated files + num_processes : int + The number of ranks that Cholla was run with + output_number : int + The output number to concatenate + skip_fields : list + List of fields to skip concatenating. Defaults to []. + destination_dtype : np.dtype + The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. + compression_type : str + What kind of compression to use on the output data. Defaults to None. + compression_options : str + What compression settings to use if compressing. Defaults to None. + chunking : bool or tuple + Whether or not to use chunking and the chunk size. Defaults to None. + source_directory: pathlib.Path : + + output_directory: pathlib.Path : + + num_processes: int : + + output_number: int : + + skip_fields: list : + (Default value = []) + destination_dtype: np.dtype : + (Default value = None) + compression_type: str : + (Default value = None) + compression_options: str : + (Default value = None) + + Returns + ------- + + """ + + # Error checking + assert num_processes > 1, 'num_processes must be greater than 1' + assert output_number >= 0, 'output_number must be greater than or equal to 0' + + # Open the output file for writing + destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}_particles.h5') + + # Setup the output file + # Note that the call to `__get_num_particles` is potentially expensive as it + # opens every single file to read the number of particles in that file + num_particles = __get_num_particles(source_directory, num_processes, output_number) + destination_file = __setup_destination_file(source_directory, + destination_file, + output_number, + num_particles, + skip_fields, + destination_dtype, + compression_type, + compression_options, + chunking) + + # loop over files for a given output + particles_offset = 0 + for i in range(0, num_processes): + # open the input file for reading + source_file = h5py.File(source_directory / f'{output_number}_particles.h5.{i}', 'r') + + # Compute the offset slicing for the 3D data + nx_local, ny_local, nz_local = source_file.attrs['dims_local'] + x_start, y_start, z_start = source_file.attrs['offset'] + x_end, y_end, z_end = x_start+nx_local, y_start+ny_local, z_start+nz_local + + # Get the local number of particles + num_particles_local = source_file.attrs['n_particles_local'][0] + + # write data from individual processor file to correct location in concatenated file + for dataset in list(destination_file.keys()): + + if dataset == 'density': + destination_file[dataset][x_start:x_end, + y_start:y_end, + z_start:z_end] = source_file[dataset] + else: + start = particles_offset + end = particles_offset + num_particles_local + destination_file[dataset][start:end] = source_file[dataset] + + # Update the particles offset + particles_offset += num_particles_local + + # Now that the copy is done we close the source file + source_file.close() + + # Close destination file now that it is fully constructed + destination_file.close() +# ============================================================================== + +# ============================================================================== +def __get_num_particles(source_directory: pathlib.Path, + num_processes: int, + output_number: int) -> int: + """Get the total number of particles in the output. This function is heavily + I/O bound and might benefit from utilizing threads. + + Parameters + ---------- + source_directory : pathlib.Path + The directory of the unconcatenated files + num_processes : int + The number of processes + output_number : int + The output number to get data from + + Returns + ------- + int + The total number of particles in the output + """ + # loop over files for a given output + num_particles = 0 + for i in range(0, num_processes): + # open the input file for reading + with h5py.File(source_directory / f'{output_number}_particles.h5.{i}', 'r') as source_file: + num_particles += source_file.attrs['n_particles_local'] + + return num_particles +# ============================================================================== + +# ============================================================================== +def __setup_destination_file(source_directory: pathlib.Path, + destination_file: h5py.File, + output_number: int, + num_particles: int, + skip_fields: list, + destination_dtype: np.dtype, + compression_type: str, + compression_options: str, + chunking) -> h5py.File: + """_summary_ + + Parameters + ---------- + source_directory : pathlib.Path + The directory containing the unconcatenated files + destination_file : h5py.File + The destination file + output_number : int + The output number to concatenate + num_particles : int + The total number of particles in the output + skip_fields : list + List of fields to skip concatenating. + destination_dtype : np.dtype + The data type of the output datasets. Accepts most numpy types. + compression_type : str + What kind of compression to use on the output data. + compression_options : str + What compression settings to use if compressing. + chunking : _type_ + Whether or not to use chunking and the chunk size. + + Returns + ------- + h5py.File + The fully set up destination file + """ + with h5py.File(source_directory / f'{output_number}_particles.h5.0', 'r') as source_file: + # Copy header data + destination_file = concat_internals.copy_header(source_file, destination_file) + + # Make list of datasets to copy + datasets_to_copy = list(source_file.keys()) + datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] + + # Create the datasets in the output file + for dataset in datasets_to_copy: + dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype + + # Determine the shape of the dataset + if dataset == 'density': + data_shape = source_file.attrs['dims'] + else: + data_shape = num_particles + + # Create the dataset + destination_file.create_dataset(name=dataset, + shape=data_shape, + dtype=dtype, + chunks=chunking, + compression=compression_type, + compression_opts=compression_options) + + return destination_file +# ============================================================================== + +if __name__ == '__main__': + from timeit import default_timer + start = default_timer() + + cli = concat_internals.common_cli() + args = cli.parse_args() + + # Perform the concatenation + for output in args.concat_outputs: + concat_particles_dataset(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts, + chunking=args.chunking) + + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') From 9b751a5b36e02ee2bb45ec1fc85fdfbb93e4d14a Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 13 Nov 2023 13:45:00 -0700 Subject: [PATCH 15/16] Update python_scripts/README.md for new concat scripts --- python_scripts/README.md | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python_scripts/README.md b/python_scripts/README.md index 5a462e8c1..acda923b7 100644 --- a/python_scripts/README.md +++ b/python_scripts/README.md @@ -5,15 +5,8 @@ You will likely develop more customized, robust, and flexible scripts for your o These simple scripts here are intended to help you understand the basics of the generated data from Cholla. ## Merging HDF5 files -Multi-processor runs generate HDF5 files per-timestep per-processor. -To treat each timestep together we want to merge those per-processor HDF5 files. -| Script | Concatenate | -| ------ | ----------- | -`cat_dset_3d.py` | 3D HDF5 datasets -`cat_projection.py` | The on-axis projection data created when the -DPROJECTION flag is turned on -`cat_rotated_projection.py` | The rotated projection data created when the -DROTATED_PROJECTION flag is turned on -`cat_slice.py` | The on-axis slice data created when the -DSLICES flag is turned on +Multi-processor runs generate HDF5 files per-timestep per-processor. Merging these per process output into a single file can be done with the concatenation scripts detailed in the "Outputs" section of the wiki. ## Plotting data We here present simple Python matplotlib-based scripts to plot density, velocity, energy, and pressure. From c1f6d9e7d8a46a128f1ef93468f4aa044ba3bbb2 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Thu, 16 Nov 2023 16:29:08 -0700 Subject: [PATCH 16/16] Fix placeholder comment --- python_scripts/concat_particles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_scripts/concat_particles.py b/python_scripts/concat_particles.py index d286a4fec..8a916f08e 100755 --- a/python_scripts/concat_particles.py +++ b/python_scripts/concat_particles.py @@ -171,7 +171,7 @@ def __setup_destination_file(source_directory: pathlib.Path, compression_type: str, compression_options: str, chunking) -> h5py.File: - """_summary_ + """Setup the destination file by copying the header and setting up the datasets Parameters ----------