Skip to content

Commit

Permalink
Rechunker script (#686)
Browse files Browse the repository at this point in the history
* Rechunker script

* Update rechunker

* update tool

* fix codefactor

* update testss

* fix typo

* fix

* add a docstring

* Add reference

* also point to more info

* Add backend_key

* add to bin

* update target size

* disable codefactor

* move

* fix iterator

* update documentation

* fix docs
  • Loading branch information
JoranAngevaare authored May 6, 2022
1 parent 72af4ca commit f331fb0
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 4 deletions.
85 changes: 85 additions & 0 deletions bin/rechunker
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python
import os.path
import argparse
import strax
import pandas as pd

def parse_args():
parser = argparse.ArgumentParser(
description="Rechunker for FileSytemBackend. Interfaces with strax.rechunker."
"Please see the documentation of strax.rechunker for more information: "
"github.com/AxFoundation/strax/blob/31c114c5f8329e53289d5127fb2125e71c3d6aae/strax/storage/files.py#L371")
parser.add_argument(
'--source',
type=str,
help="Target directory to rechunk, should be a folder in a "
"strax.DataDrictory (one datatype)")
parser.add_argument(
'--dest', '--destination',
default=None,
dest='dest',
type=str,
help='Where to store rechunked data. If nothing is specified, replace the source.',
)
parser.add_argument(
'--compressor',
choices=list(strax.io.COMPRESSORS.keys()),
help="Recompress using one of these compressors. If nothing specified, "
"use the same compressor as the source")
parser.add_argument(
'--rechunk',
default=True,
choices=[True, False],
type=bool,
help="rechunk the data")
parser.add_argument(
'--target_size_mb', '--target-size-mb',
dest='target_size_mb',
type=int,
default=strax.default_chunk_size_mb,
help="Target size MB (uncompressed) of the rechunked data")
parser.add_argument(
'--write_stats_to', '--write-stats-to',
dest='write_stats_to',
type=str,
default=None,
help="Write some information to this file (csv format)")
args = parser.parse_args()
return args


def main():
args = parse_args()
source_mb = strax.utils.dir_size_mb(args.source)
report = strax.rechunker(source_directory=args.source,
dest_directory=args.dest,
replace=args.dest is None,
compressor=args.compressor,
target_size_mb=args.target_size_mb,
rechunk=args.rechunk,
)
if args.dest is not None:
recompressed_mb = strax.utils.dir_size_mb(args.dest)
else:
recompressed_mb = strax.utils.dir_size_mb(args.source)
report.update(dict(source_mb=source_mb,
dest_mb=recompressed_mb)
)
if args.write_stats_to:
if os.path.exists(args.write_stats_to):
df = pd.read_csv(args.write_stats_to)
else:
df = pd.DataFrame()
df_new = pd.concat(
[df,
pd.DataFrame({k: [v] for k, v in report.items()})
])
df_new.to_csv(args.write_stats_to, index=False)

print(f'Re-compressed {args.source}')
for k, v in report.items():
print(f'\t{k:16}\t{v}')


if __name__ == '__main__':
main()
68 changes: 66 additions & 2 deletions docs/source/advanced/recompression.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
Recompressing & moving data
===========================
There are two options for recompressing data:
- via the context :py:func:`context.copy_to_frontend`
- via a dedicated script ``rechunker`` that only works for filesystem backends and works outside the context.

In order to recompress data with another compression algorithm the
:py:func:`context.copy_to_frontend` function can be used.
The function works on a per run_id, per datatype basis. In the example
The function works on a per run_id-, per datatype- basis. In the example
below, peaks data is copied to a second frontend.


Expand Down Expand Up @@ -134,4 +138,64 @@ re-writing the data to disk (as done folder_C in the example above).
As such, for further use, it does not matter if the data is coming from
either of folders folder_A-folder_C as the metadata will tell strax
which compressor to use. Different compressors may have different
performance for loading/writing data.
performance for loading/writing data.

Rechunker script
================
From strax v1.2.2 onwards, a ``rechunker`` script is automatically installed with strax.
It can be used to re-write data in the ``FileSystem`` backend.


For example:

.. code-block:: bash
rechunker --source 009104-raw_records_aqmon-rfzvpzj4mf --compressor zstd
will output:


.. code-block:: rst
Will write to /tmp/tmpoj0xpr78 and make sub-folder 009104-raw_records_aqmon-rfzvpzj4mf
Rechunking 009104-raw_records_aqmon-rfzvpzj4mf to /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf
move /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf to 009104-raw_records_aqmon-rfzvpzj4mf
Re-compressed 009104-raw_records_aqmon-rfzvpzj4mf
backend_key 009104-raw_records_aqmon-rfzvpzj4mf
load_time 0.4088103771209717
write_time 0.07699322700500488
uncompressed_mb 1.178276
source_compressor zstd
dest_compressor zstd
source_mb 0.349217
dest_mb 0.349218
Using script to profile write/read rates for compressors
--------------------------------------------------------
This script can easily be used to profile different compressors:

.. code-block:: bash
for COMPRESSOR in zstd bz2 lz4 blosc zstd; \
do echo $COMPRESSOR; \
rechunker \
--source 009104-raw_records-rfzvpzj4mf \
--write_stats_to test.csv \
--compressor $COMPRESSOR; \
done
We can check the output in python using:

.. code-block:: python
>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df['read_mbs'] = df['uncompressed_mb']/df['load_time']
>>> df['write_mbs'] = df['uncompressed_mb']/df['write_time']
>>> print(df[['source_compressor', 'read_mbs', 'dest_compressor', 'write_mbs']].to_string())
source_compressor read_mbs dest_compressor write_mbs
0 zstd 313.922890 zstd 298.429123
1 zstd 284.530054 bz2 8.932259
2 bz2 20.289876 lz4 228.932498
3 lz4 372.491150 blosc 433.494794
4 blosc 725.154966 zstd 215.765177
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def open_requirements(path):
extras_require={
'docs': docs_requires
},
scripts=[
'bin/rechunker',
],
long_description_content_type="text/markdown",
packages=setuptools.find_packages() + ['extra_requirements'],
package_dir={'extra_requirements': 'extra_requirements'},
Expand Down
15 changes: 14 additions & 1 deletion strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1855,11 +1855,24 @@ def copy_to_frontend(self,
# Need to load a new loader each time since it's a generator
# and will be exhausted otherwise.
loader = s_be.loader(s_be_key)

def wrapped_loader():
"""Wrapped loader for changing the target_size_mb"""
while True:
try:
# pylint: disable=cell-var-from-loop
data = next(loader)
# Update target chunk size for re-chunking
data.target_size_mb = md['chunk_target_size_mb']
except StopIteration:
return
yield data

# Fill the target buffer
t_be_str, t_be_key = t_sf.find(data_key, write=True)
target_be = t_sf._get_backend(t_be_str)
saver = target_be._saver(t_be_key, md)
saver.save_from(loader, rechunk=rechunk)
saver.save_from(wrapped_loader(), rechunk=rechunk)
except NotImplementedError:
# Target is not susceptible
continue
Expand Down
111 changes: 110 additions & 1 deletion strax/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import tempfile
import os
import os.path as osp

import typing
import time
from bson import json_util
import shutil

Expand Down Expand Up @@ -364,3 +365,111 @@ def _close(self):
@export
class InvalidFolderNameFormat(Exception):
pass


@export
def rechunker(source_directory:str,
dest_directory: typing.Optional[str] = None,
replace: bool = False,
compressor: typing.Optional[str] = None,
target_size_mb: typing.Optional[str] = None,
rechunk: bool = True,
)-> dict:
"""
Rechunk/Recompress a strax-datatype saved in a FileSystemBackend
outside of a strax.Context. For a user-friendly context centered
alternative, see strax.Context.copy_to_frontend:
github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa
One can either specify a destination directory where to store a new
copy of this data with <dest_directory> or replace the input file
with it's rechunked version.
This function can either:
- rechunk (if <rechunk? is True), probably incrementing the
<target_size_mb> is also useful (create larger chunks)
- recompress (if a <compressor> is specified)
One can also rechunk and recompress simultaneously
:param source_directory: Path to a folder containing a single
strax.DataType.
:param dest_directory: Head of a folder whereto write new data. If
nothing is specified, write to a temporary directory.
:param replace: Delete the source_directory and replace it with it's
rechunked version
:param compressor: Compressor to be used in saving the rechunked
data
:param target_size_mb: Target size of chunks (uncompressed). As long
as a chunk is smaller than this many MB, keep adding new MBs
until the chunk is at least target_size_mb or we run out of
chunks.
:param rechunk: Do we want to rechunk?
:return: Dictionary with some information on the write/load times
involved.
"""
if not os.path.exists(source_directory):
raise FileNotFoundError(f'No file at {source_directory}')
if not replace and dest_directory is None:
raise ValueError(f'Specify a destination path <dest_file> when '
f'not replacing the original path')
backend_key = os.path.basename(os.path.normpath(source_directory))

if dest_directory is None and replace:
_temp_dir = tempfile.TemporaryDirectory()
dest_directory = _temp_dir.name
else:
_temp_dir = False


if os.path.basename(os.path.normpath(dest_directory)) != backend_key:
# New key should be correct! If there is not an exact match,
# we want to make sure that we append the backend_key correctly
print(f'Will write to {dest_directory} and make sub-folder {backend_key}')
dest_directory = os.path.join(dest_directory, backend_key)
backend = strax.FileSytemBackend()
meta_data = backend.get_metadata(source_directory)
source_compressor = meta_data['compressor']

if compressor is not None:
meta_data['compressor'] = compressor
if target_size_mb is not None:
meta_data['chunk_target_size_mb'] = target_size_mb

data_loader = backend.loader(source_directory)

load_time_seconds = []
def loader():
"""Wrapped loader for bookkeeping load time"""
while True:
try:
t0 = time.time()
data = next(data_loader)
# Update target chunk size for re-chunking
data.target_size_mb = meta_data['chunk_target_size_mb']
load_time_seconds.append(time.time()-t0)
except StopIteration:
return
yield data
print(f'Rechunking {source_directory} to {dest_directory}')
saver = backend._saver(dest_directory, metadata=meta_data)

write_time_start = time.time()
saver.save_from(loader(), rechunk=rechunk)
load_time = sum(load_time_seconds)
write_time = time.time() - write_time_start - load_time

if replace:
print(f'move {dest_directory} to {source_directory}')
shutil.rmtree(source_directory)
shutil.move(dest_directory, source_directory)
if _temp_dir:
_temp_dir.cleanup()

return dict(backend_key=backend_key,
load_time=load_time,
write_time=write_time,
uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6,
source_compressor=source_compressor,
dest_compressor=meta_data['compressor'],
)
20 changes: 20 additions & 0 deletions strax/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pandas as pd
from collections.abc import Mapping
from warnings import warn
import os


# Change numba's caching backend from pickle to dill
Expand Down Expand Up @@ -696,3 +697,22 @@ def apply_selection(x,
del x2

return x


def dir_size_mb(start_path ='.'):
"""
Calculate the total size of all files in start_path
Thanks https://stackoverflow.com/a/1392549/18280620
"""
if not os.path.exists(start_path):
return 0

total_size = 0
for dirpath, dirnames, filenames in os.walk(start_path):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)

return total_size / 1e6
Loading

0 comments on commit f331fb0

Please sign in to comment.