diff --git a/bin/rechunker b/bin/rechunker index 691f2f5a7..2f760e2ff 100644 --- a/bin/rechunker +++ b/bin/rechunker @@ -1,6 +1,8 @@ #!/usr/bin/env python import os.path import argparse +import typing + import strax import pandas as pd @@ -13,7 +15,7 @@ def parse_args(): '--source', type=str, help="Target directory to rechunk, should be a folder in a " - "strax.DataDrictory (one datatype)") + "strax.DataDirectory (one datatype)") parser.add_argument( '--dest', '--destination', default=None, @@ -44,12 +46,26 @@ def parse_args(): type=str, default=None, help="Write some information to this file (csv format)") + parser.add_argument( + '--parallel', + type=str, + default='False', + choices=['False', 'True', 'thread', 'process'], + help="Parallelize using threadpool or processpool") + parser.add_argument( + '--max_workers', + type=int, + default=4, + help="Max workers if parallel is specified") + parser.add_argument( + '--profile_memory', + action='store_true', + help="Profile memory usage") args = parser.parse_args() return args -def main(): - args = parse_args() +def main(args): source_mb = strax.utils.dir_size_mb(args.source) report = strax.rechunker(source_directory=args.source, dest_directory=args.dest, @@ -57,9 +73,11 @@ def main(): compressor=args.compressor, target_size_mb=args.target_size_mb, rechunk=args.rechunk, + parallel={'False': False, 'True': True}.get(args.parallel, args.parallel), + max_workers=args.max_workers, ) if args.dest is not None: - recompressed_mb = strax.utils.dir_size_mb(args.dest) + recompressed_mb = strax.utils.dir_size_mb(report.get('dest_directory', args.dest)) else: recompressed_mb = strax.utils.dir_size_mb(args.source) report.update(dict(source_mb=source_mb, @@ -82,4 +100,14 @@ def main(): if __name__ == '__main__': - main() + args = parse_args() + if args.profile_memory: + from memory_profiler import memory_usage + import time + start = time.time() + mem = memory_usage(proc=(main, (args,))) + print(f"Memory profiler says peak RAM usage was: {max(mem):.1f} MB") + print(f'Took {time.time() - start:.1f} s = {(time.time() - start) / 3600:.2f} h ') + print('Bye, bye') + else: + main(args) diff --git a/strax/__init__.py b/strax/__init__.py index bb3a8242c..ae5cd6545 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -11,6 +11,7 @@ from .storage.common import * from .storage.files import * +from .storage.file_rechunker import * from .storage.mongo import * from .storage.zipfiles import * diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py new file mode 100644 index 000000000..e94567978 --- /dev/null +++ b/strax/storage/file_rechunker.py @@ -0,0 +1,191 @@ +import tempfile +import os +import typing +import time +import shutil +import strax +from functools import partial +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor + +export, __all__ = strax.exporter() + + +@export +def rechunker(source_directory: str, + dest_directory: typing.Optional[str] = None, + replace: bool = False, + compressor: typing.Optional[str] = None, + target_size_mb: typing.Optional[str] = None, + rechunk: bool = True, + progress_bar: bool = True, + parallel: typing.Union[bool, str] = False, + max_workers: int = 4, + _timeout: int = 24 * 3600 + ) -> dict: + """ + Rechunk/Recompress a strax-datatype saved in a FileSystemBackend + outside the strax.Context. For a user-friendly context centered + alternative, see strax.Context.copy_to_frontend: + github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa + + One can either specify a destination directory where to store a new + copy of this data with or replace the input file + with it's rechunked version. + + This function can either: + - rechunk (if is also useful (create larger chunks) + - recompress (if a is specified) + + One can also rechunk and recompress simultaneously + + :param source_directory: Path to a folder containing a single + strax.DataType. + :param dest_directory: Head of a folder whereto write new data. If + nothing is specified, write to a temporary directory. + :param replace: Delete the source_directory and replace it with it's + rechunked version + :param compressor: Compressor to be used in saving the rechunked + data + :param target_size_mb: Target size of chunks (uncompressed). As long + as a chunk is smaller than this many MB, keep adding new MBs + until the chunk is at least target_size_mb or we run out of + chunks. + :param rechunk: Do we want to rechunk? + :param progress_bar: display progress bar + :param parallel: Parallelize using threadpool or process pool (otherwise run in serial) + :param max_workers: number of threads/cores to associate to the parallel + processing, only relevant when parallel is specified + :param _timeout: Mailbox timeout + :return: Dictionary with some information on the write/load times + involved. + """ + _check_arguments(source_directory, replace, dest_directory, parallel) + backend_key = os.path.basename(os.path.normpath(source_directory)) + dest_directory, _temp_dir = _get_dest_and_tempdir(dest_directory, replace, backend_key) + + backend = strax.FileSytemBackend(set_target_chunk_mb=target_size_mb) + meta_data = _get_meta_data(backend, source_directory, compressor, target_size_mb) + source_compressor = meta_data['compressor'] + executor = _get_executor(parallel, max_workers) + + data_loader = backend.loader(source_directory, executor=executor) + pbar = strax.utils.tqdm(total=len(meta_data['chunks']), + disable=not progress_bar, + desc=backend_key) + load_time_seconds = [] + + def load_wrapper(generator): + """Wrapped loader for bookkeeping load time""" + n_bytes = 0 + while True: + try: + t0 = time.time() + data = next(generator) + t1 = time.time() + load_time_seconds.append(t1 - t0) + n_bytes += data.nbytes + pbar.postfix = f'{(n_bytes / 1e6) / (t1 - pbar.start_t):.1f} MB/s' + pbar.n += 1 + pbar.display() + except StopIteration: + pbar.close() + return + yield data + + print(f'Rechunking {source_directory} to {dest_directory}') + saver = backend._saver(dest_directory, metadata=meta_data, saver_timeout=_timeout) + + write_time_start = time.time() + _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout) + if not os.path.exists(dest_directory): + raise FileNotFoundError(f'{dest_directory} not found, did one of the savers die?') + load_time = sum(load_time_seconds) + write_time = time.time() - write_time_start - load_time + + _move_directories(replace, source_directory, dest_directory, _temp_dir) + + return dict(backend_key=backend_key, + load_time=load_time, + write_time=write_time, + uncompressed_mb=sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, + source_compressor=source_compressor, + dest_compressor=meta_data['compressor'], + dest_directory=dest_directory, + ) + + +def _check_arguments(source_directory, replace, dest_directory, parallel): + if not os.path.exists(source_directory): + raise FileNotFoundError(f'No file at {source_directory}') + if not replace and dest_directory is None: + raise ValueError(f'Specify a destination path when ' + f'not replacing the original path') + if parallel not in [False, True, 'thread', 'process']: + raise ValueError('Choose from False, "thread" or "process"') + + +def _get_dest_and_tempdir(dest_directory, replace, backend_key): + if dest_directory is None and replace: + _temp_dir = tempfile.TemporaryDirectory() + dest_directory = _temp_dir.name + else: + _temp_dir = False + + if os.path.basename(os.path.normpath(dest_directory)) != backend_key: + # New key should be correct! If there is not an exact match, + # we want to make sure that we append the backend_key correctly + print(f'Will write to {dest_directory} and make sub-folder {backend_key}') + dest_directory = os.path.join(dest_directory, backend_key) + return dest_directory, _temp_dir + + +def _move_directories(replace, source_directory, dest_directory, _temp_dir): + if replace: + print(f'move {dest_directory} to {source_directory}') + shutil.rmtree(source_directory) + shutil.move(dest_directory, source_directory) + if _temp_dir: + _temp_dir.cleanup() + + +def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout): + if executor is None: + saver.save_from(load_wrapper(data_loader), rechunk=rechunk) + return + + mailbox = strax.Mailbox(name='rechunker', timeout=_timeout) + mailbox.add_sender(data_loader) + mailbox.add_reader(partial( + saver.save_from, + executor=executor, + rechunk=rechunk, + )) + final_generator = mailbox.subscribe() + + # Make sure everything is added to the mailbox before starting! + mailbox.start() + for _ in load_wrapper(final_generator): + pass + + mailbox.cleanup() + executor.shutdown(wait=True) + + +def _get_meta_data(backend, source_directory, compressor, target_size_mb): + meta_data = backend.get_metadata(source_directory) + if compressor is not None: + meta_data['compressor'] = compressor + if target_size_mb is not None: + meta_data['chunk_target_size_mb'] = target_size_mb + return meta_data + + +def _get_executor(parallel, max_workers): + # nested import - prevent circular imports + from strax.processor import SHMExecutor + return { + True: ThreadPoolExecutor(max_workers), + 'thread': ThreadPoolExecutor(max_workers), + 'process': ProcessPoolExecutor(max_workers) if SHMExecutor is None else SHMExecutor(max_workers), + }.get(parallel) diff --git a/strax/storage/files.py b/strax/storage/files.py index 99a09bc6b..12e5d06a8 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -1,16 +1,12 @@ import glob import json -import tempfile import os import os.path as osp -import typing -import time from bson import json_util import shutil import strax from .common import StorageFrontend - export, __all__ = strax.exporter() RUN_METADATA_PATTERN = '%s-metadata.json' @@ -215,6 +211,18 @@ class FileSytemBackend(strax.StorageBackend): Metadata is stored in a file called metadata.json. """ + def __init__(self, *args, set_target_chunk_mb: int = None, **kwargs, ): + """ + Add set_chunk_size_mb to strax.StorageBackend to allow changing + the chunk.target_size_mb returned from the loader, any args or kwargs + are passed to the strax.StorageBackend + + :param set_target_chunk_mb: Prior to returning the loaders' chunks, + return the chunk with an updated target size + """ + super().__init__(*args, **kwargs) + self.set_chunk_size_mb = set_target_chunk_mb + def _get_metadata(self, dirname): prefix = dirname_to_prefix(dirname) metadata_json = f'{prefix}-metadata.json' @@ -237,6 +245,12 @@ def _get_metadata(self, dirname): with open(md_path, mode='r') as f: return json.loads(f.read()) + def _read_and_format_chunk(self, *args, **kwargs): + chunk = super()._read_and_format_chunk(*args, **kwargs) + if self.set_chunk_size_mb: + chunk.target_size_mb = self.set_chunk_size_mb + return chunk + def _read_chunk(self, dirname, chunk_info, dtype, compressor): fn = osp.join(dirname, chunk_info['filename']) return strax.load_file(fn, dtype=dtype, compressor=compressor) @@ -365,111 +379,3 @@ def _close(self): @export class InvalidFolderNameFormat(Exception): pass - - -@export -def rechunker(source_directory:str, - dest_directory: typing.Optional[str] = None, - replace: bool = False, - compressor: typing.Optional[str] = None, - target_size_mb: typing.Optional[str] = None, - rechunk: bool = True, - )-> dict: - """ - Rechunk/Recompress a strax-datatype saved in a FileSystemBackend - outside of a strax.Context. For a user-friendly context centered - alternative, see strax.Context.copy_to_frontend: - github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa - - One can either specify a destination directory where to store a new - copy of this data with or replace the input file - with it's rechunked version. - - This function can either: - - rechunk (if is also useful (create larger chunks) - - recompress (if a is specified) - - One can also rechunk and recompress simultaneously - - :param source_directory: Path to a folder containing a single - strax.DataType. - :param dest_directory: Head of a folder whereto write new data. If - nothing is specified, write to a temporary directory. - :param replace: Delete the source_directory and replace it with it's - rechunked version - :param compressor: Compressor to be used in saving the rechunked - data - :param target_size_mb: Target size of chunks (uncompressed). As long - as a chunk is smaller than this many MB, keep adding new MBs - until the chunk is at least target_size_mb or we run out of - chunks. - :param rechunk: Do we want to rechunk? - :return: Dictionary with some information on the write/load times - involved. - """ - if not os.path.exists(source_directory): - raise FileNotFoundError(f'No file at {source_directory}') - if not replace and dest_directory is None: - raise ValueError(f'Specify a destination path when ' - f'not replacing the original path') - backend_key = os.path.basename(os.path.normpath(source_directory)) - - if dest_directory is None and replace: - _temp_dir = tempfile.TemporaryDirectory() - dest_directory = _temp_dir.name - else: - _temp_dir = False - - - if os.path.basename(os.path.normpath(dest_directory)) != backend_key: - # New key should be correct! If there is not an exact match, - # we want to make sure that we append the backend_key correctly - print(f'Will write to {dest_directory} and make sub-folder {backend_key}') - dest_directory = os.path.join(dest_directory, backend_key) - backend = strax.FileSytemBackend() - meta_data = backend.get_metadata(source_directory) - source_compressor = meta_data['compressor'] - - if compressor is not None: - meta_data['compressor'] = compressor - if target_size_mb is not None: - meta_data['chunk_target_size_mb'] = target_size_mb - - data_loader = backend.loader(source_directory) - - load_time_seconds = [] - def loader(): - """Wrapped loader for bookkeeping load time""" - while True: - try: - t0 = time.time() - data = next(data_loader) - # Update target chunk size for re-chunking - data.target_size_mb = meta_data['chunk_target_size_mb'] - load_time_seconds.append(time.time()-t0) - except StopIteration: - return - yield data - print(f'Rechunking {source_directory} to {dest_directory}') - saver = backend._saver(dest_directory, metadata=meta_data) - - write_time_start = time.time() - saver.save_from(loader(), rechunk=rechunk) - load_time = sum(load_time_seconds) - write_time = time.time() - write_time_start - load_time - - if replace: - print(f'move {dest_directory} to {source_directory}') - shutil.rmtree(source_directory) - shutil.move(dest_directory, source_directory) - if _temp_dir: - _temp_dir.cleanup() - - return dict(backend_key=backend_key, - load_time=load_time, - write_time=write_time, - uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, - source_compressor=source_compressor, - dest_compressor=meta_data['compressor'], - ) diff --git a/tests/test_storage.py b/tests/test_storage.py index 283c9f6b0..05c6f62b9 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -242,7 +242,17 @@ def test_rechunking(self): self._rechunking(compressor) self.tearDown() - def _rechunking(self, compressor): + def test_rechunk_parallelization(self): + for parallel in [True, 'process', False]: + with self.subTest(parallel = parallel): + self.setUp() + self._rechunking(compressor = 'blosc', parallel = parallel) + self.tearDown() + + def test_replace(self): + self._rechunking(compressor = 'blosc', replace=True) + + def _rechunking(self, compressor, parallel=False, replace=False): """ Test that we can use the strax.files.rechunking function to rechunk data outside the context @@ -261,10 +271,13 @@ def _rechunking(self, compressor): assert original_n_files > 3 # At least two files + metadata _, backend_key = source_sf.find(st.key_for(run_id, self.target)) strax.rechunker(source_directory=backend_key, - dest_directory=target_path.name, + dest_directory=target_path.name if not replace else None, replace=True, compressor=compressor, target_size_mb=strax.default_chunk_size_mb * 2, + parallel=parallel, + max_workers=4, + _timeout=5, ) assert st.is_stored(run_id, self.target) # Should be empty, we just replaced the source