Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rechunker using Mailbox #710

Merged
merged 11 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions bin/rechunker
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python
import os.path
import argparse
import typing

import strax
import pandas as pd

Expand All @@ -13,7 +15,7 @@ def parse_args():
'--source',
type=str,
help="Target directory to rechunk, should be a folder in a "
"strax.DataDrictory (one datatype)")
"strax.DataDirectory (one datatype)")
parser.add_argument(
'--dest', '--destination',
default=None,
Expand Down Expand Up @@ -44,22 +46,38 @@ def parse_args():
type=str,
default=None,
help="Write some information to this file (csv format)")
parser.add_argument(
'--parallel',
type=str,
default='False',
choices=['False', 'True', 'thread', 'process'],
help="Parallelize using threadpool or processpool")
parser.add_argument(
'--max_workers',
type=int,
default=4,
help="Max workers if parallel is specified")
parser.add_argument(
'--profile_memory',
action='store_true',
help="Profile memory usage")
args = parser.parse_args()
return args


def main():
args = parse_args()
def main(args):
source_mb = strax.utils.dir_size_mb(args.source)
report = strax.rechunker(source_directory=args.source,
dest_directory=args.dest,
replace=args.dest is None,
compressor=args.compressor,
target_size_mb=args.target_size_mb,
rechunk=args.rechunk,
parallel={'False': False, 'True': True}.get(args.parallel, args.parallel),
max_workers=args.max_workers,
)
if args.dest is not None:
recompressed_mb = strax.utils.dir_size_mb(args.dest)
recompressed_mb = strax.utils.dir_size_mb(report.get('dest_directory', args.dest))
else:
recompressed_mb = strax.utils.dir_size_mb(args.source)
report.update(dict(source_mb=source_mb,
Expand All @@ -82,4 +100,14 @@ def main():


if __name__ == '__main__':
main()
args = parse_args()
if args.profile_memory:
from memory_profiler import memory_usage
import time
start = time.time()
mem = memory_usage(proc=(main, (args,)))
print(f"Memory profiler says peak RAM usage was: {max(mem):.1f} MB")
print(f'Took {time.time() - start:.1f} s = {(time.time() - start) / 3600:.2f} h ')
print('Bye, bye')
else:
main(args)
1 change: 1 addition & 0 deletions strax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .storage.common import *
from .storage.files import *
from .storage.file_rechunker import *
from .storage.mongo import *
from .storage.zipfiles import *

Expand Down
191 changes: 191 additions & 0 deletions strax/storage/file_rechunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import tempfile
import os
import typing
import time
import shutil
import strax
from functools import partial
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

export, __all__ = strax.exporter()


@export
def rechunker(source_directory: str,
dest_directory: typing.Optional[str] = None,
replace: bool = False,
compressor: typing.Optional[str] = None,
target_size_mb: typing.Optional[str] = None,
rechunk: bool = True,
progress_bar: bool = True,
parallel: typing.Union[bool, str] = False,
max_workers: int = 4,
_timeout: int = 24 * 3600
) -> dict:
"""
Rechunk/Recompress a strax-datatype saved in a FileSystemBackend
outside the strax.Context. For a user-friendly context centered
alternative, see strax.Context.copy_to_frontend:
github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa

One can either specify a destination directory where to store a new
copy of this data with <dest_directory> or replace the input file
with it's rechunked version.

This function can either:
- rechunk (if <rechunk? is True), probably incrementing the
<target_size_mb> is also useful (create larger chunks)
- recompress (if a <compressor> is specified)

One can also rechunk and recompress simultaneously

:param source_directory: Path to a folder containing a single
strax.DataType.
:param dest_directory: Head of a folder whereto write new data. If
nothing is specified, write to a temporary directory.
:param replace: Delete the source_directory and replace it with it's
rechunked version
:param compressor: Compressor to be used in saving the rechunked
data
:param target_size_mb: Target size of chunks (uncompressed). As long
as a chunk is smaller than this many MB, keep adding new MBs
until the chunk is at least target_size_mb or we run out of
chunks.
:param rechunk: Do we want to rechunk?
:param progress_bar: display progress bar
:param parallel: Parallelize using threadpool or process pool (otherwise run in serial)
:param max_workers: number of threads/cores to associate to the parallel
processing, only relevant when parallel is specified
:param _timeout: Mailbox timeout
:return: Dictionary with some information on the write/load times
involved.
"""
_check_arguments(source_directory, replace, dest_directory, parallel)
backend_key = os.path.basename(os.path.normpath(source_directory))
dest_directory, _temp_dir = _get_dest_and_tempdir(dest_directory, replace, backend_key)

backend = strax.FileSytemBackend(set_target_chunk_mb=target_size_mb)
meta_data = _get_meta_data(backend, source_directory, compressor, target_size_mb)
source_compressor = meta_data['compressor']
executor = _get_executor(parallel, max_workers)

data_loader = backend.loader(source_directory, executor=executor)
pbar = strax.utils.tqdm(total=len(meta_data['chunks']),
disable=not progress_bar,
desc=backend_key)
load_time_seconds = []

def load_wrapper(generator):
"""Wrapped loader for bookkeeping load time"""
n_bytes = 0
while True:
try:
t0 = time.time()
data = next(generator)
t1 = time.time()
load_time_seconds.append(t1 - t0)
n_bytes += data.nbytes
pbar.postfix = f'{(n_bytes / 1e6) / (t1 - pbar.start_t):.1f} MB/s'
pbar.n += 1
pbar.display()
except StopIteration:
pbar.close()
return
yield data

print(f'Rechunking {source_directory} to {dest_directory}')
saver = backend._saver(dest_directory, metadata=meta_data, saver_timeout=_timeout)

write_time_start = time.time()
_exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout)
if not os.path.exists(dest_directory):
raise FileNotFoundError(f'{dest_directory} not found, did one of the savers die?')
load_time = sum(load_time_seconds)
write_time = time.time() - write_time_start - load_time

_move_directories(replace, source_directory, dest_directory, _temp_dir)

return dict(backend_key=backend_key,
load_time=load_time,
write_time=write_time,
uncompressed_mb=sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6,
source_compressor=source_compressor,
dest_compressor=meta_data['compressor'],
dest_directory=dest_directory,
)


def _check_arguments(source_directory, replace, dest_directory, parallel):
if not os.path.exists(source_directory):
raise FileNotFoundError(f'No file at {source_directory}')
if not replace and dest_directory is None:
raise ValueError(f'Specify a destination path <dest_file> when '
f'not replacing the original path')
if parallel not in [False, True, 'thread', 'process']:
raise ValueError('Choose from False, "thread" or "process"')


def _get_dest_and_tempdir(dest_directory, replace, backend_key):
if dest_directory is None and replace:
_temp_dir = tempfile.TemporaryDirectory()
dest_directory = _temp_dir.name
else:
_temp_dir = False

if os.path.basename(os.path.normpath(dest_directory)) != backend_key:
# New key should be correct! If there is not an exact match,
# we want to make sure that we append the backend_key correctly
print(f'Will write to {dest_directory} and make sub-folder {backend_key}')
dest_directory = os.path.join(dest_directory, backend_key)
return dest_directory, _temp_dir


def _move_directories(replace, source_directory, dest_directory, _temp_dir):
if replace:
print(f'move {dest_directory} to {source_directory}')
shutil.rmtree(source_directory)
shutil.move(dest_directory, source_directory)
if _temp_dir:
_temp_dir.cleanup()


def _exhaust_generator(executor, saver, load_wrapper, data_loader, rechunk, _timeout):
if executor is None:
saver.save_from(load_wrapper(data_loader), rechunk=rechunk)
return

mailbox = strax.Mailbox(name='rechunker', timeout=_timeout)
mailbox.add_sender(data_loader)
mailbox.add_reader(partial(
saver.save_from,
executor=executor,
rechunk=rechunk,
))
final_generator = mailbox.subscribe()

# Make sure everything is added to the mailbox before starting!
mailbox.start()
for _ in load_wrapper(final_generator):
pass

mailbox.cleanup()
executor.shutdown(wait=True)


def _get_meta_data(backend, source_directory, compressor, target_size_mb):
meta_data = backend.get_metadata(source_directory)
if compressor is not None:
meta_data['compressor'] = compressor
if target_size_mb is not None:
meta_data['chunk_target_size_mb'] = target_size_mb
return meta_data


def _get_executor(parallel, max_workers):
# nested import - prevent circular imports
from strax.processor import SHMExecutor
return {
True: ThreadPoolExecutor(max_workers),
'thread': ThreadPoolExecutor(max_workers),
'process': ProcessPoolExecutor(max_workers) if SHMExecutor is None else SHMExecutor(max_workers),
}.get(parallel)
Loading