Skip to content

Commit

Permalink
extra updates - resolve conflicts (#1137)
Browse files Browse the repository at this point in the history
Co-authored-by: Joran Angevaare <j.angevaare@nikhef.nl>
  • Loading branch information
JoranAngevaare and Joran Angevaare authored Feb 9, 2023
1 parent 3560990 commit 4361e51
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions bin/restrax
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ How to use
For more info, see the documentation:
https://straxen.readthedocs.io/en/latest/scripts.html
"""
__version__ = '0.0.3'
__version__ = '0.1.0'

import argparse
import logging
Expand Down Expand Up @@ -43,13 +43,13 @@ def parse_args():
parser.add_argument('--max_threads', type=int, default=1,
help='max number of threads to simultaneously work on one run'
)
parser.add_argument('--skip_compression', nargs='*',
default=[],
help='skip recompression of any datatype that fnmatches. For example: "*event* *peak*"'
)
parser.add_argument('--skip_compression', nargs='*', default=['*event*'],
help='skip recompression of any datatype that fnmatches. For example: "*event* *peak*"')
# deep_compare is meant only for testing
parser.add_argument('--deep_compare', action='store_true',
help='open all the data of the old and the new format and check that they are the same'
)
help='Open all the data of the old and the new format and check that they are the same')
parser.add_argument('recompress_min_chunks', type=int, default = 3,
help ='Only bother with doing the recompression if there are more than this many chunks')
actions = parser.add_mutually_exclusive_group()
actions.add_argument('--undying', action='store_true',
help="Except any error and ignore it")
Expand Down Expand Up @@ -113,9 +113,12 @@ class ReStrax(daq_core.DataBases):
'raw_records_he',
'records', 'records_nv', 'records_mv')

# If a dataset is larger than this many bites, only compare a single field
# If a dataset is larger than this many bites, only compare a single field (time)
large_data_compare_threshold = 5e9

# If the total data rate is lower than this, use bz2 for the compression of raw records.
bz2_compression_threshold_mbs = 50 # MB/s

def __init__(self, args):
super().__init__(production=args.production)

Expand All @@ -129,6 +132,7 @@ class ReStrax(daq_core.DataBases):
self.skip_compression = args.skip_compression
self.process = args.process
self.deep_compare = args.deep_compare
self.recompress_min_chunks = args.recompress_min_chunks

def infinite_loop(self, close=False) -> None:
"""Core of restrax, recompress the data followed by several validation steps"""
Expand Down Expand Up @@ -179,7 +183,7 @@ class ReStrax(daq_core.DataBases):
def _find_testing_work(self, projection: dict) -> ty.Optional[dict]:
"""Find work from the pre-dir if we are testing"""
folders = os.listdir(self.read_from)
first_run = None
first_run = self.process
data_docs = []
for folder in folders:
if os.path.exists(os.path.join(self.write_to, folder)):
Expand Down Expand Up @@ -233,8 +237,7 @@ class ReStrax(daq_core.DataBases):

def run_software_veto(self, run_doc):
"""This is where we can add a software veto for specific runs"""
# Currently nothing is implemented
pass
raise NotImplementedError

def rechunk_docs(self, docs: ty.List[dict]) -> None:
"""
Expand All @@ -257,7 +260,7 @@ class ReStrax(daq_core.DataBases):
for ddoc in docs:
self._rechunk_per_doc(ddoc)

def _rechunk_per_doc(self, data_doc):
def _rechunk_per_doc(self, run_doc, data_doc):
"""Do the rechunking document by document"""
dir_in = data_doc['location']
dir_out = self.renamed_path(dir_in)
Expand All @@ -276,7 +279,7 @@ class ReStrax(daq_core.DataBases):
return

self.log.info(f'Start {dir_in} -> {dir_out}')
compressor, target_size_mb = self.get_compressor_and_size(data_doc)
compressor, target_size_mb = self.get_compressor_and_size(run_doc, data_doc)

summary = strax.rechunker(
source_directory=dir_in,
Expand All @@ -294,19 +297,21 @@ class ReStrax(daq_core.DataBases):
self.log.info(f'{dir_out} written {summary}')

def _sleep_while_n_threads_alive(self, threads, n):
while (n_alive := sum(t.is_alive() for t in threads)) > n:
while sum(t.is_alive() for t in threads) > n:
self.take_a_nap(self.nap_time_short)

def get_compressor_and_size(self, data_doc: dict) -> ty.Tuple[str, ty.Optional[int]]:
def get_compressor_and_size(self, run_doc: dict, data_doc: dict) -> ty.Tuple[str, ty.Optional[int]]:
"""
For a given data document infer the desired compressor, and target size
:param run_doc: run document
:param data_doc: data document
"""
# This is where we might do some fancy coding
dtype = data_doc['type']
if dtype in self.raw_record_types:
compressor = 'bz2'
rate = sum(detector.get('avg', 100) for detector in run_doc.get('rate', {'none': {}}).values())
compressor = 'bz2' if rate < self.bz2_compression_threshold_mbs else 'zstd'
target_size_mb = 5000
elif 'peak' in dtype:
compressor = 'zstd'
Expand All @@ -326,8 +331,14 @@ class ReStrax(daq_core.DataBases):
"""
if data_doc['type'] == 'live_data':
return True
return any(fnmatch.fnmatch(data_doc['type'], delete)
for delete in self.skip_compression)
if any(fnmatch.fnmatch(data_doc['type'], delete)
for delete in self.skip_compression):
return True
n_chunks = len(strax.FileSytemBackend().get_metadata(data_doc['location']).get('chunks', []))
if n_chunks <= self.recompress_min_chunks:
# no need to recompress data if it's only one chunk
return True
return False

def do_checks(self, data_docs: ty.List[dict]) -> None:
"""
Expand Down

0 comments on commit 4361e51

Please sign in to comment.