From 8c33799319a97a96d49fc8e9af4031428f77cd74 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Mon, 6 Feb 2023 11:05:25 +0100 Subject: [PATCH 01/11] add pbar for rechunker --- strax/storage/files.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/strax/storage/files.py b/strax/storage/files.py index 99a09bc6b..a3e254a4b 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -374,6 +374,7 @@ def rechunker(source_directory:str, compressor: typing.Optional[str] = None, target_size_mb: typing.Optional[str] = None, rechunk: bool = True, + progress_bar: bool = True, )-> dict: """ Rechunk/Recompress a strax-datatype saved in a FileSystemBackend @@ -405,6 +406,7 @@ def rechunker(source_directory:str, until the chunk is at least target_size_mb or we run out of chunks. :param rechunk: Do we want to rechunk? + :param progress_bar: display progress bar :return: Dictionary with some information on the write/load times involved. """ @@ -437,7 +439,7 @@ def rechunker(source_directory:str, meta_data['chunk_target_size_mb'] = target_size_mb data_loader = backend.loader(source_directory) - + pbar = strax.utils.tqdm(total = len(meta_data['chunks']), disable=not progress_bar) load_time_seconds = [] def loader(): """Wrapped loader for bookkeeping load time""" @@ -448,7 +450,10 @@ def loader(): # Update target chunk size for re-chunking data.target_size_mb = meta_data['chunk_target_size_mb'] load_time_seconds.append(time.time()-t0) + pbar.n += 1 + pbar.display() except StopIteration: + pbar.close() return yield data print(f'Rechunking {source_directory} to {dest_directory}') From 5db61deadcf5507bb71d8602b15f43fb940a6f97 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 15:31:59 +0100 Subject: [PATCH 02/11] use mailbox system to allow for parallel rechunking --- bin/rechunker | 36 ++++++- strax/__init__.py | 1 + strax/storage/file_rechunker.py | 160 ++++++++++++++++++++++++++++++++ strax/storage/files.py | 133 ++++---------------------- tests/test_storage.py | 9 +- 5 files changed, 218 insertions(+), 121 deletions(-) create mode 100644 strax/storage/file_rechunker.py diff --git a/bin/rechunker b/bin/rechunker index 691f2f5a7..da1a91c89 100644 --- a/bin/rechunker +++ b/bin/rechunker @@ -1,6 +1,8 @@ #!/usr/bin/env python import os.path import argparse +import typing + import strax import pandas as pd @@ -13,7 +15,7 @@ def parse_args(): '--source', type=str, help="Target directory to rechunk, should be a folder in a " - "strax.DataDrictory (one datatype)") + "strax.DataDirectory (one datatype)") parser.add_argument( '--dest', '--destination', default=None, @@ -44,12 +46,26 @@ def parse_args(): type=str, default=None, help="Write some information to this file (csv format)") + parser.add_argument( + '--parallel', + type=str, + default='False', + choices=['False', 'True', 'thread', 'process'], + help="Parallelize using threadpool or processpool") + parser.add_argument( + '--max_workers', + type=int, + default=4, + help="Max workers if parallel is specified") + parser.add_argument( + '--profile_memory', + action='store_true', + help="Profile memory usage") args = parser.parse_args() return args -def main(): - args = parse_args() +def main(args): source_mb = strax.utils.dir_size_mb(args.source) report = strax.rechunker(source_directory=args.source, dest_directory=args.dest, @@ -57,6 +73,8 @@ def main(): compressor=args.compressor, target_size_mb=args.target_size_mb, rechunk=args.rechunk, + parallel={'False': False, 'True': True}.get(args.parallel, args.parallel), + max_workers=args.max_workers, ) if args.dest is not None: recompressed_mb = strax.utils.dir_size_mb(args.dest) @@ -82,4 +100,14 @@ def main(): if __name__ == '__main__': - main() + args = parse_args() + if args.profile_memory: + from memory_profiler import memory_usage + import time + start = time.time() + mem = memory_usage(proc=(main, (args,))) + print(f"Memory profiler says peak RAM usage was: {max(mem):.1f} MB") + print(f'Took {time.time() - start:.1f} s = {(time.time() - start) / 3600:.2f} h ') + print('Bye, bye') + else: + main(args) diff --git a/strax/__init__.py b/strax/__init__.py index bb3a8242c..ae5cd6545 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -11,6 +11,7 @@ from .storage.common import * from .storage.files import * +from .storage.file_rechunker import * from .storage.mongo import * from .storage.zipfiles import * diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py new file mode 100644 index 000000000..bb75de750 --- /dev/null +++ b/strax/storage/file_rechunker.py @@ -0,0 +1,160 @@ +import tempfile +import os +import typing +import time + +import shutil + +import strax + +from functools import partial + +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from .files import FileSytemBackend +export, __all__ = strax.exporter() + +@export +def rechunker(source_directory:str, + dest_directory: typing.Optional[str] = None, + replace: bool = False, + compressor: typing.Optional[str] = None, + target_size_mb: typing.Optional[str] = None, + rechunk: bool = True, + progress_bar: bool = True, + parallel : typing.Union[bool,str] = False, + max_workers: int = 4, + _timeout: int = 24*3600 + )-> dict: + """ + Rechunk/Recompress a strax-datatype saved in a FileSystemBackend + outside the strax.Context. For a user-friendly context centered + alternative, see strax.Context.copy_to_frontend: + github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa + + One can either specify a destination directory where to store a new + copy of this data with or replace the input file + with it's rechunked version. + + This function can either: + - rechunk (if is also useful (create larger chunks) + - recompress (if a is specified) + + One can also rechunk and recompress simultaneously + + :param source_directory: Path to a folder containing a single + strax.DataType. + :param dest_directory: Head of a folder whereto write new data. If + nothing is specified, write to a temporary directory. + :param replace: Delete the source_directory and replace it with it's + rechunked version + :param compressor: Compressor to be used in saving the rechunked + data + :param target_size_mb: Target size of chunks (uncompressed). As long + as a chunk is smaller than this many MB, keep adding new MBs + until the chunk is at least target_size_mb or we run out of + chunks. + :param rechunk: Do we want to rechunk? + :param progress_bar: display progress bar + :param parallel: Parallelize using threadpool or process pool (otherwise run in serial) + :param max_workers: number of threads/cores to associate to the parallel + processing, only relevant when parallel is specified + :param _timeout: Mailbox timeout + :return: Dictionary with some information on the write/load times + involved. + """ + if not os.path.exists(source_directory): + raise FileNotFoundError(f'No file at {source_directory}') + if not replace and dest_directory is None: + raise ValueError(f'Specify a destination path when ' + f'not replacing the original path') + if parallel not in [False, True, 'thread', 'process']: + raise ValueError('Choose from False, "thread" or "process"') + backend_key = os.path.basename(os.path.normpath(source_directory)) + + if dest_directory is None and replace: + _temp_dir = tempfile.TemporaryDirectory() + dest_directory = _temp_dir.name + else: + _temp_dir = False + + if os.path.basename(os.path.normpath(dest_directory)) != backend_key: + # New key should be correct! If there is not an exact match, + # we want to make sure that we append the backend_key correctly + print(f'Will write to {dest_directory} and make sub-folder {backend_key}') + dest_directory = os.path.join(dest_directory, backend_key) + backend = strax.FileSytemBackend(set_target_chunk_mb=target_size_mb) + meta_data = backend.get_metadata(source_directory) + source_compressor = meta_data['compressor'] + + # nested import - prevent circular imports + from strax.processor import SHMExecutor + executor = { + True: ThreadPoolExecutor(max_workers), + 'thread': ThreadPoolExecutor(max_workers), + 'process': ProcessPoolExecutor(max_workers) if SHMExecutor is None else SHMExecutor(max_workers), + }.get(parallel) + + if compressor is not None: + meta_data['compressor'] = compressor + if target_size_mb is not None: + meta_data['chunk_target_size_mb'] = target_size_mb + + data_loader = backend.loader(source_directory, executor=executor) + pbar = strax.utils.tqdm(total=len(meta_data['chunks']), disable=not progress_bar) + load_time_seconds = [] + + def load_wrapper(generator): + n_bytes = 0 + """Wrapped loader for bookkeeping load time""" + while True: + try: + t0 = time.time() + data = next(generator) + t1 = time.time() + load_time_seconds.append(t1-t0) + n_bytes += data.nbytes + pbar.postfix = f'{(n_bytes/1e6)/(t1-pbar.start_t):.1f} MB/s' + pbar.n += 1 + pbar.display() + except StopIteration: + pbar.close() + return + yield data + print(f'Rechunking {source_directory} to {dest_directory}') + saver = backend._saver(dest_directory, metadata=meta_data) + + write_time_start = time.time() + if executor is None: + saver.save_from(load_wrapper(data_loader), rechunk=rechunk) + else: + mailbox = strax.Mailbox(name='rechunker', timeout=_timeout) + mailbox.add_sender(data_loader) + mailbox.add_reader(partial( + saver.save_from, + executor=executor, + rechunk=rechunk, + )) + mailbox.start() + final_generator = mailbox.subscribe() + for _ in load_wrapper(final_generator): + pass + mailbox.cleanup() + executor.shutdown(wait=True) + load_time = sum(load_time_seconds) + write_time = time.time() - write_time_start - load_time + + if replace: + print(f'move {dest_directory} to {source_directory}') + shutil.rmtree(source_directory) + shutil.move(dest_directory, source_directory) + if _temp_dir: + _temp_dir.cleanup() + + return dict(backend_key=backend_key, + load_time=load_time, + write_time=write_time, + uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, + source_compressor=source_compressor, + dest_compressor=meta_data['compressor'], + ) diff --git a/strax/storage/files.py b/strax/storage/files.py index a3e254a4b..3b1cc008f 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -1,16 +1,12 @@ import glob import json -import tempfile import os import os.path as osp -import typing -import time from bson import json_util import shutil import strax from .common import StorageFrontend - export, __all__ = strax.exporter() RUN_METADATA_PATTERN = '%s-metadata.json' @@ -215,6 +211,18 @@ class FileSytemBackend(strax.StorageBackend): Metadata is stored in a file called metadata.json. """ + def __init__(self, *args, set_target_chunk_mb: int = None, **kwargs, ): + """ + Add set_chunk_size_mb to strax.StorageBackend to allow changing + the chunk.target_size_mb returned from the loader, any args or kwargs + are passed to the strax.StorageBackend + + :param set_target_chunk_mb: Prior to returning the loaders' chunks, + return the chunk with an updated target size + """ + super().__init__(*args, **kwargs) + self.set_chunk_size_mb = set_target_chunk_mb + def _get_metadata(self, dirname): prefix = dirname_to_prefix(dirname) metadata_json = f'{prefix}-metadata.json' @@ -236,6 +244,11 @@ def _get_metadata(self, dirname): with open(md_path, mode='r') as f: return json.loads(f.read()) + def _read_and_format_chunk(self, *args, **kwargs): + chunk = super()._read_and_format_chunk(*args, **kwargs) + if self.set_chunk_size_mb: + chunk.target_size_mb = self.set_chunk_size_mb + return chunk def _read_chunk(self, dirname, chunk_info, dtype, compressor): fn = osp.join(dirname, chunk_info['filename']) @@ -366,115 +379,3 @@ def _close(self): class InvalidFolderNameFormat(Exception): pass - -@export -def rechunker(source_directory:str, - dest_directory: typing.Optional[str] = None, - replace: bool = False, - compressor: typing.Optional[str] = None, - target_size_mb: typing.Optional[str] = None, - rechunk: bool = True, - progress_bar: bool = True, - )-> dict: - """ - Rechunk/Recompress a strax-datatype saved in a FileSystemBackend - outside of a strax.Context. For a user-friendly context centered - alternative, see strax.Context.copy_to_frontend: - github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa - - One can either specify a destination directory where to store a new - copy of this data with or replace the input file - with it's rechunked version. - - This function can either: - - rechunk (if is also useful (create larger chunks) - - recompress (if a is specified) - - One can also rechunk and recompress simultaneously - - :param source_directory: Path to a folder containing a single - strax.DataType. - :param dest_directory: Head of a folder whereto write new data. If - nothing is specified, write to a temporary directory. - :param replace: Delete the source_directory and replace it with it's - rechunked version - :param compressor: Compressor to be used in saving the rechunked - data - :param target_size_mb: Target size of chunks (uncompressed). As long - as a chunk is smaller than this many MB, keep adding new MBs - until the chunk is at least target_size_mb or we run out of - chunks. - :param rechunk: Do we want to rechunk? - :param progress_bar: display progress bar - :return: Dictionary with some information on the write/load times - involved. - """ - if not os.path.exists(source_directory): - raise FileNotFoundError(f'No file at {source_directory}') - if not replace and dest_directory is None: - raise ValueError(f'Specify a destination path when ' - f'not replacing the original path') - backend_key = os.path.basename(os.path.normpath(source_directory)) - - if dest_directory is None and replace: - _temp_dir = tempfile.TemporaryDirectory() - dest_directory = _temp_dir.name - else: - _temp_dir = False - - - if os.path.basename(os.path.normpath(dest_directory)) != backend_key: - # New key should be correct! If there is not an exact match, - # we want to make sure that we append the backend_key correctly - print(f'Will write to {dest_directory} and make sub-folder {backend_key}') - dest_directory = os.path.join(dest_directory, backend_key) - backend = strax.FileSytemBackend() - meta_data = backend.get_metadata(source_directory) - source_compressor = meta_data['compressor'] - - if compressor is not None: - meta_data['compressor'] = compressor - if target_size_mb is not None: - meta_data['chunk_target_size_mb'] = target_size_mb - - data_loader = backend.loader(source_directory) - pbar = strax.utils.tqdm(total = len(meta_data['chunks']), disable=not progress_bar) - load_time_seconds = [] - def loader(): - """Wrapped loader for bookkeeping load time""" - while True: - try: - t0 = time.time() - data = next(data_loader) - # Update target chunk size for re-chunking - data.target_size_mb = meta_data['chunk_target_size_mb'] - load_time_seconds.append(time.time()-t0) - pbar.n += 1 - pbar.display() - except StopIteration: - pbar.close() - return - yield data - print(f'Rechunking {source_directory} to {dest_directory}') - saver = backend._saver(dest_directory, metadata=meta_data) - - write_time_start = time.time() - saver.save_from(loader(), rechunk=rechunk) - load_time = sum(load_time_seconds) - write_time = time.time() - write_time_start - load_time - - if replace: - print(f'move {dest_directory} to {source_directory}') - shutil.rmtree(source_directory) - shutil.move(dest_directory, source_directory) - if _temp_dir: - _temp_dir.cleanup() - - return dict(backend_key=backend_key, - load_time=load_time, - write_time=write_time, - uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, - source_compressor=source_compressor, - dest_compressor=meta_data['compressor'], - ) diff --git a/tests/test_storage.py b/tests/test_storage.py index 283c9f6b0..3685b08d0 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -242,7 +242,14 @@ def test_rechunking(self): self._rechunking(compressor) self.tearDown() - def _rechunking(self, compressor): + def test_rechunk_prarillellization(self): + for parallel in [False, True, 'process']: + with self.subTest(parallel = parallel): + self.setUp() + self._rechunking(compressor = 'blocs', parallel = parallel) + self.tearDown() + + def _rechunking(self, compressor, parallel=False): """ Test that we can use the strax.files.rechunking function to rechunk data outside the context From 231b65b7f010c09005a379d67a3de6c81914d634 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 15:53:41 +0100 Subject: [PATCH 03/11] also test --- tests/test_storage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_storage.py b/tests/test_storage.py index 3685b08d0..b24b14f31 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -272,6 +272,7 @@ def _rechunking(self, compressor, parallel=False): replace=True, compressor=compressor, target_size_mb=strax.default_chunk_size_mb * 2, + parallel=parallel, ) assert st.is_stored(run_id, self.target) # Should be empty, we just replaced the source From 966ce8b0ab29cd0127d96c82c114b9cf9cde8965 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 16:20:46 +0100 Subject: [PATCH 04/11] fix codefactor --- bin/rechunker | 2 +- strax/storage/file_rechunker.py | 136 ++++++++++++++++++-------------- tests/test_storage.py | 5 +- 3 files changed, 79 insertions(+), 64 deletions(-) diff --git a/bin/rechunker b/bin/rechunker index da1a91c89..2f760e2ff 100644 --- a/bin/rechunker +++ b/bin/rechunker @@ -77,7 +77,7 @@ def main(args): max_workers=args.max_workers, ) if args.dest is not None: - recompressed_mb = strax.utils.dir_size_mb(args.dest) + recompressed_mb = strax.utils.dir_size_mb(report.get('dest_directory', args.dest)) else: recompressed_mb = strax.utils.dir_size_mb(args.source) report.update(dict(source_mb=source_mb, diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py index bb75de750..142f107e3 100644 --- a/strax/storage/file_rechunker.py +++ b/strax/storage/file_rechunker.py @@ -2,29 +2,27 @@ import os import typing import time - import shutil - import strax - from functools import partial - from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor -from .files import FileSytemBackend + + export, __all__ = strax.exporter() + @export -def rechunker(source_directory:str, +def rechunker(source_directory: str, dest_directory: typing.Optional[str] = None, replace: bool = False, compressor: typing.Optional[str] = None, target_size_mb: typing.Optional[str] = None, rechunk: bool = True, progress_bar: bool = True, - parallel : typing.Union[bool,str] = False, + parallel: typing.Union[bool, str] = False, max_workers: int = 4, - _timeout: int = 24*3600 - )-> dict: + _timeout: int = 24 * 3600 + ) -> dict: """ Rechunk/Recompress a strax-datatype saved in a FileSystemBackend outside the strax.Context. For a user-friendly context centered @@ -63,68 +61,86 @@ def rechunker(source_directory:str, :return: Dictionary with some information on the write/load times involved. """ - if not os.path.exists(source_directory): - raise FileNotFoundError(f'No file at {source_directory}') - if not replace and dest_directory is None: - raise ValueError(f'Specify a destination path when ' - f'not replacing the original path') - if parallel not in [False, True, 'thread', 'process']: - raise ValueError('Choose from False, "thread" or "process"') + _check_arguments(source_directory, replace, dest_directory, parallel) backend_key = os.path.basename(os.path.normpath(source_directory)) + dest_directory, _temp_dir = _get_dest_and_tempdir(dest_directory, replace, backend_key) - if dest_directory is None and replace: - _temp_dir = tempfile.TemporaryDirectory() - dest_directory = _temp_dir.name - else: - _temp_dir = False - - if os.path.basename(os.path.normpath(dest_directory)) != backend_key: - # New key should be correct! If there is not an exact match, - # we want to make sure that we append the backend_key correctly - print(f'Will write to {dest_directory} and make sub-folder {backend_key}') - dest_directory = os.path.join(dest_directory, backend_key) backend = strax.FileSytemBackend(set_target_chunk_mb=target_size_mb) - meta_data = backend.get_metadata(source_directory) + meta_data = _get_meta_data(backend, source_directory, compressor, target_size_mb) source_compressor = meta_data['compressor'] - - # nested import - prevent circular imports - from strax.processor import SHMExecutor - executor = { - True: ThreadPoolExecutor(max_workers), - 'thread': ThreadPoolExecutor(max_workers), - 'process': ProcessPoolExecutor(max_workers) if SHMExecutor is None else SHMExecutor(max_workers), - }.get(parallel) - - if compressor is not None: - meta_data['compressor'] = compressor - if target_size_mb is not None: - meta_data['chunk_target_size_mb'] = target_size_mb + executor = _get_executor(parallel, max_workers) data_loader = backend.loader(source_directory, executor=executor) pbar = strax.utils.tqdm(total=len(meta_data['chunks']), disable=not progress_bar) load_time_seconds = [] def load_wrapper(generator): - n_bytes = 0 """Wrapped loader for bookkeeping load time""" + n_bytes = 0 while True: try: t0 = time.time() data = next(generator) t1 = time.time() - load_time_seconds.append(t1-t0) + load_time_seconds.append(t1 - t0) n_bytes += data.nbytes - pbar.postfix = f'{(n_bytes/1e6)/(t1-pbar.start_t):.1f} MB/s' + pbar.postfix = f'{(n_bytes / 1e6) / (t1 - pbar.start_t):.1f} MB/s' pbar.n += 1 pbar.display() except StopIteration: pbar.close() return yield data + print(f'Rechunking {source_directory} to {dest_directory}') saver = backend._saver(dest_directory, metadata=meta_data) write_time_start = time.time() + _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout) + load_time = sum(load_time_seconds) + write_time = time.time() - write_time_start - load_time + + _move_directories(replace, source_directory, dest_directory, _temp_dir) + + return dict(backend_key=backend_key, + load_time=load_time, + write_time=write_time, + uncompressed_mb=sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, + source_compressor=source_compressor, + dest_compressor=meta_data['compressor'], + dest_directory=dest_directory, + ) + +def _check_arguments(source_directory, replace, dest_directory, parallel): + if not os.path.exists(source_directory): + raise FileNotFoundError(f'No file at {source_directory}') + if not replace and dest_directory is None: + raise ValueError(f'Specify a destination path when ' + f'not replacing the original path') + if parallel not in [False, True, 'thread', 'process']: + raise ValueError('Choose from False, "thread" or "process"') + +def _get_dest_and_tempdir(dest_directory, replace, backend_key): + if dest_directory is None and replace: + _temp_dir = tempfile.TemporaryDirectory() + dest_directory = _temp_dir.name + else: + _temp_dir = False + + if os.path.basename(os.path.normpath(dest_directory)) != backend_key: + # New key should be correct! If there is not an exact match, + # we want to make sure that we append the backend_key correctly + print(f'Will write to {dest_directory} and make sub-folder {backend_key}') + dest_directory = os.path.join(dest_directory, backend_key) + return dest_directory, _temp_dir +def _move_directories(replace, source_directory, dest_directory, _temp_dir): + if replace: + print(f'move {dest_directory} to {source_directory}') + shutil.rmtree(source_directory) + shutil.move(dest_directory, source_directory) + if _temp_dir: + _temp_dir.cleanup() +def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout): if executor is None: saver.save_from(load_wrapper(data_loader), rechunk=rechunk) else: @@ -141,20 +157,18 @@ def load_wrapper(generator): pass mailbox.cleanup() executor.shutdown(wait=True) - load_time = sum(load_time_seconds) - write_time = time.time() - write_time_start - load_time - - if replace: - print(f'move {dest_directory} to {source_directory}') - shutil.rmtree(source_directory) - shutil.move(dest_directory, source_directory) - if _temp_dir: - _temp_dir.cleanup() - - return dict(backend_key=backend_key, - load_time=load_time, - write_time=write_time, - uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6, - source_compressor=source_compressor, - dest_compressor=meta_data['compressor'], - ) +def _get_meta_data(backend,source_directory, compressor, target_size_mb): + meta_data = backend.get_metadata(source_directory) + if compressor is not None: + meta_data['compressor'] = compressor + if target_size_mb is not None: + meta_data['chunk_target_size_mb'] = target_size_mb + return meta_data +def _get_executor(parallel, max_workers): + # nested import - prevent circular imports + from strax.processor import SHMExecutor + return { + True: ThreadPoolExecutor(max_workers), + 'thread': ThreadPoolExecutor(max_workers), + 'process': ProcessPoolExecutor(max_workers) if SHMExecutor is None else SHMExecutor(max_workers), + }.get(parallel) diff --git a/tests/test_storage.py b/tests/test_storage.py index b24b14f31..bba693019 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -242,11 +242,11 @@ def test_rechunking(self): self._rechunking(compressor) self.tearDown() - def test_rechunk_prarillellization(self): + def test_rechunk_parallelization(self): for parallel in [False, True, 'process']: with self.subTest(parallel = parallel): self.setUp() - self._rechunking(compressor = 'blocs', parallel = parallel) + self._rechunking(compressor = 'blosc', parallel = parallel) self.tearDown() def _rechunking(self, compressor, parallel=False): @@ -273,6 +273,7 @@ def _rechunking(self, compressor, parallel=False): compressor=compressor, target_size_mb=strax.default_chunk_size_mb * 2, parallel=parallel, + max_workers=2, ) assert st.is_stored(run_id, self.target) # Should be empty, we just replaced the source From b0e208b603507ca419b96a53054e336743fd02c5 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 16:21:33 +0100 Subject: [PATCH 05/11] fix it more --- strax/storage/files.py | 1 - 1 file changed, 1 deletion(-) diff --git a/strax/storage/files.py b/strax/storage/files.py index 3b1cc008f..85e8da115 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -378,4 +378,3 @@ def _close(self): @export class InvalidFolderNameFormat(Exception): pass - From ad05af014f7224b267417835e13e4772a1b1420f Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 16:25:57 +0100 Subject: [PATCH 06/11] add timeout for tests --- strax/storage/file_rechunker.py | 42 ++++++++++++++++++++------------- tests/test_storage.py | 1 + 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py index 142f107e3..b89bb4e03 100644 --- a/strax/storage/file_rechunker.py +++ b/strax/storage/file_rechunker.py @@ -7,7 +7,6 @@ from functools import partial from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor - export, __all__ = strax.exporter() @@ -111,6 +110,7 @@ def load_wrapper(generator): dest_directory=dest_directory, ) + def _check_arguments(source_directory, replace, dest_directory, parallel): if not os.path.exists(source_directory): raise FileNotFoundError(f'No file at {source_directory}') @@ -120,6 +120,7 @@ def _check_arguments(source_directory, replace, dest_directory, parallel): if parallel not in [False, True, 'thread', 'process']: raise ValueError('Choose from False, "thread" or "process"') + def _get_dest_and_tempdir(dest_directory, replace, backend_key): if dest_directory is None and replace: _temp_dir = tempfile.TemporaryDirectory() @@ -133,6 +134,8 @@ def _get_dest_and_tempdir(dest_directory, replace, backend_key): print(f'Will write to {dest_directory} and make sub-folder {backend_key}') dest_directory = os.path.join(dest_directory, backend_key) return dest_directory, _temp_dir + + def _move_directories(replace, source_directory, dest_directory, _temp_dir): if replace: print(f'move {dest_directory} to {source_directory}') @@ -140,30 +143,37 @@ def _move_directories(replace, source_directory, dest_directory, _temp_dir): shutil.move(dest_directory, source_directory) if _temp_dir: _temp_dir.cleanup() + + def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout): if executor is None: saver.save_from(load_wrapper(data_loader), rechunk=rechunk) - else: - mailbox = strax.Mailbox(name='rechunker', timeout=_timeout) - mailbox.add_sender(data_loader) - mailbox.add_reader(partial( - saver.save_from, - executor=executor, - rechunk=rechunk, - )) - mailbox.start() - final_generator = mailbox.subscribe() - for _ in load_wrapper(final_generator): - pass - mailbox.cleanup() - executor.shutdown(wait=True) -def _get_meta_data(backend,source_directory, compressor, target_size_mb): + return + + mailbox = strax.Mailbox(name='rechunker', timeout=_timeout) + mailbox.add_sender(data_loader) + mailbox.add_reader(partial( + saver.save_from, + executor=executor, + rechunk=rechunk, + )) + mailbox.start() + final_generator = mailbox.subscribe() + for _ in load_wrapper(final_generator): + pass + mailbox.cleanup() + executor.shutdown(wait=True) + + +def _get_meta_data(backend, source_directory, compressor, target_size_mb): meta_data = backend.get_metadata(source_directory) if compressor is not None: meta_data['compressor'] = compressor if target_size_mb is not None: meta_data['chunk_target_size_mb'] = target_size_mb return meta_data + + def _get_executor(parallel, max_workers): # nested import - prevent circular imports from strax.processor import SHMExecutor diff --git a/tests/test_storage.py b/tests/test_storage.py index bba693019..29ddbb75c 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -274,6 +274,7 @@ def _rechunking(self, compressor, parallel=False): target_size_mb=strax.default_chunk_size_mb * 2, parallel=parallel, max_workers=2, + _timeout=90, ) assert st.is_stored(run_id, self.target) # Should be empty, we just replaced the source From 555196142e335407495b051040b0c02b01bd837a Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 17:17:22 +0100 Subject: [PATCH 07/11] fix hang --- strax/storage/file_rechunker.py | 4 +++- tests/test_storage.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py index b89bb4e03..68879cd5e 100644 --- a/strax/storage/file_rechunker.py +++ b/strax/storage/file_rechunker.py @@ -157,8 +157,10 @@ def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _tim executor=executor, rechunk=rechunk, )) - mailbox.start() final_generator = mailbox.subscribe() + + # Make sure everything is added to the mailbox before starting! + mailbox.start() for _ in load_wrapper(final_generator): pass mailbox.cleanup() diff --git a/tests/test_storage.py b/tests/test_storage.py index 29ddbb75c..766e87aa7 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -243,13 +243,16 @@ def test_rechunking(self): self.tearDown() def test_rechunk_parallelization(self): - for parallel in [False, True, 'process']: + for parallel in [True, 'process', False]: with self.subTest(parallel = parallel): self.setUp() self._rechunking(compressor = 'blosc', parallel = parallel) self.tearDown() - def _rechunking(self, compressor, parallel=False): + def test_replace(self): + self._rechunking(replace=True) + + def _rechunking(self, compressor, parallel=False, replace=False): """ Test that we can use the strax.files.rechunking function to rechunk data outside the context @@ -268,13 +271,13 @@ def _rechunking(self, compressor, parallel=False): assert original_n_files > 3 # At least two files + metadata _, backend_key = source_sf.find(st.key_for(run_id, self.target)) strax.rechunker(source_directory=backend_key, - dest_directory=target_path.name, + dest_directory=target_path.name if not replace else None, replace=True, compressor=compressor, target_size_mb=strax.default_chunk_size_mb * 2, parallel=parallel, - max_workers=2, - _timeout=90, + max_workers=4, + _timeout=5, ) assert st.is_stored(run_id, self.target) # Should be empty, we just replaced the source From c916a229029d4ac080adb5c591d1869b0f90536a Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 17:19:40 +0100 Subject: [PATCH 08/11] add missing line --- strax/storage/files.py | 1 + 1 file changed, 1 insertion(+) diff --git a/strax/storage/files.py b/strax/storage/files.py index 85e8da115..12e5d06a8 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -244,6 +244,7 @@ def _get_metadata(self, dirname): with open(md_path, mode='r') as f: return json.loads(f.read()) + def _read_and_format_chunk(self, *args, **kwargs): chunk = super()._read_and_format_chunk(*args, **kwargs) if self.set_chunk_size_mb: From 4c3035d083be4798f143fa5edffb40395a84e65f Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 17:23:32 +0100 Subject: [PATCH 09/11] I like verbose pbars --- strax/storage/file_rechunker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py index 68879cd5e..37c8f345a 100644 --- a/strax/storage/file_rechunker.py +++ b/strax/storage/file_rechunker.py @@ -70,7 +70,9 @@ def rechunker(source_directory: str, executor = _get_executor(parallel, max_workers) data_loader = backend.loader(source_directory, executor=executor) - pbar = strax.utils.tqdm(total=len(meta_data['chunks']), disable=not progress_bar) + pbar = strax.utils.tqdm(total=len(meta_data['chunks']), + disable=not progress_bar, + desc=backend_key) load_time_seconds = [] def load_wrapper(generator): From bba16792c630ba3ae1bb039e116a43356f14fe90 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Wed, 8 Feb 2023 17:29:49 +0100 Subject: [PATCH 10/11] fix test --- tests/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index 766e87aa7..05c6f62b9 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -250,7 +250,7 @@ def test_rechunk_parallelization(self): self.tearDown() def test_replace(self): - self._rechunking(replace=True) + self._rechunking(compressor = 'blosc', replace=True) def _rechunking(self, compressor, parallel=False, replace=False): """ From 06a3426818c8d2e8b16705b059f0bdb79dd2ca56 Mon Sep 17 00:00:00 2001 From: Joran Angevaare Date: Mon, 13 Feb 2023 11:16:03 +0100 Subject: [PATCH 11/11] fix timeout issues --- strax/storage/file_rechunker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/strax/storage/file_rechunker.py b/strax/storage/file_rechunker.py index 37c8f345a..e94567978 100644 --- a/strax/storage/file_rechunker.py +++ b/strax/storage/file_rechunker.py @@ -94,10 +94,12 @@ def load_wrapper(generator): yield data print(f'Rechunking {source_directory} to {dest_directory}') - saver = backend._saver(dest_directory, metadata=meta_data) + saver = backend._saver(dest_directory, metadata=meta_data, saver_timeout=_timeout) write_time_start = time.time() _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout) + if not os.path.exists(dest_directory): + raise FileNotFoundError(f'{dest_directory} not found, did one of the savers die?') load_time = sum(load_time_seconds) write_time = time.time() - write_time_start - load_time @@ -165,6 +167,7 @@ def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _tim mailbox.start() for _ in load_wrapper(final_generator): pass + mailbox.cleanup() executor.shutdown(wait=True)