Source code for amstrax.auto_processing.amstraxer

#!/usr/bin/env python
"""
Process a single run with amstrax
"""
import argparse
import datetime
import json
import logging
import os
import os.path as osp
import platform
import sys
import time

import psutil


[docs]def parse_args(): parser = argparse.ArgumentParser( description='Process a single run with amstrax', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( 'run_id', metavar='RUN_ID', type=str, help="ID of the run to process; usually the run name.") parser.add_argument( '--context', default='xams_little', help="Name of context to use") parser.add_argument( '--target', default='raw_records', help='Target final data type to produce') parser.add_argument( '--from_scratch', action='store_true', help='Start processing at raw_records, regardless of what data is available. ' 'Saving will ONLY occur to ./strax_data! If you already have the target' 'data in ./strax_data, you need to delete it there first.') parser.add_argument( '--config_kwargs', type=json.loads, help='Use a json-dict to set the context to. For example:' '--config_kwargs ' '\'{' '"allow_multiprocess": true, ' '"max_messages":4, ' '"allow_shm": true, ' '"allow_lazy": true}\'' ) parser.add_argument( '--testing_rundoc', type=json.loads, help='This is only used for testing, do not use!' ) parser.add_argument( '--context_kwargs', type=json.loads, help='Use a json-file to load the context with (see config_kwargs for JSON-example)') parser.add_argument( '--timeout', default=None, type=int, help="Strax' internal mailbox timeout in seconds") parser.add_argument( '--workers', default=1, type=int, help=("Number of worker threads/processes. " "Strax will multithread (1/plugin) even if you set this to 1.")) parser.add_argument( '--debug', action='store_true', help="Enable debug logging to stdout") parser.add_argument( '--build_lowlevel', action='store_true', help='Build low-level data even if the context forbids it.') return parser.parse_args()
[docs]def main(args): logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s') print(f"Starting processing of run {args.run_id} until {args.target}") print(f"\tpython {platform.python_version()} at {sys.executable}") # These imports take a bit longer, so it's nicer # to do them after argparsing (so --help is fast) import strax print(f"\tstrax {strax.__version__} at {osp.dirname(strax.__file__)}") import amstrax print(f"\tamstrax {amstrax.__version__} at {osp.dirname(amstrax.__file__)}") if args.context_kwargs: logging.info(f'set context kwargs {args.context_kwargs}') st = getattr(amstrax.contexts, args.context)(**args.context_kwargs) else: st = getattr(amstrax.contexts, args.context)() if args.config_kwargs: logging.info(f'set context options to {args.config_kwargs}') st.set_config(to_dict_tuple(args.config_kwargs)) if args.timeout is not None: st.context_config['timeout'] = args.timeout if args.build_lowlevel: st.context_config['forbid_creation_of'] = tuple() if 'raw_records' in args.target: # Only for testing! testing_rd = args.testing_rundoc if testing_rd is not None: testing_rd['start'] = datetime.datetime.now() st = amstrax.contexts.context_for_daq_reader(st, args.run_id, run_doc=testing_rd) if args.from_scratch: for q in st.storage: q.take_only = ('raw_records',) st.storage.append( strax.DataDirectory('./strax_data', overwrite='always', provide_run_metadata=False)) if st.is_stored(args.run_id, args.target): print("This data is already available.") return 1 try: md = st.run_metadata(args.run_id) except strax.RunMetadataNotAvailable: logging.warning('Using dummy timestamps') md = {} md['end'] = datetime.datetime.now() md['start'] = md['end'] - datetime.timedelta(seconds=360) t_start = md['start'].replace(tzinfo=datetime.timezone.utc).timestamp() t_end = md['end'].replace(tzinfo=datetime.timezone.utc).timestamp() run_duration = t_end - t_start st.config['run_start_time'] = md['start'].timestamp() st.context_config['free_options'] = tuple( list(st.context_config['free_options']) + ['run_start_time']) process = psutil.Process(os.getpid()) peak_ram = 0 def get_results(): kwargs = dict( run_id=args.run_id, targets=args.target, max_workers=int(args.workers)) yield from st.get_iter(**kwargs) clock_start = 0 for i, d in enumerate(get_results()): mem_mb = process.memory_info().rss / 1e6 peak_ram = max(mem_mb, peak_ram) if len(d) == 0: print(f"Got chunk {i}, but it is empty! Using {mem_mb:.1f} MB RAM.") continue # Compute detector/data time left t = d.end / 1e9 dt = t - t_start time_left = t_end - t msg = (f"Got {len(d)} items. " f"Now {dt:.1f} sec / {100 * dt / run_duration:.1f}% into the run. " f"Using {mem_mb:.1f} MB RAM. ") if clock_start is not None: # Compute processing job clock time left d_clock = time.time() - clock_start clock_time_left = time_left / (dt / d_clock) msg += f"ETA {clock_time_left:.2f} sec." else: clock_start = time.time() print(msg, flush=True) print(f"\nAmstraxer is done! " f"We took {time.time() - clock_start:.1f} seconds, " f"peak RAM usage was around {peak_ram:.1f} MB.")
[docs]def to_dict_tuple(res: dict): """Convert list configs to tuple configs""" res = res.copy() for k, v in res.copy().items(): if type(v) == list: # Remove lists to tuples res[k] = tuple(_v if type(_v) != list else tuple(_v) for _v in v) return res
if __name__ == '__main__': args = parse_args() sys.exit(main(args))