Source code for amstrax.contexts

import os
from datetime import timezone

import strax
from immutabledict import immutabledict

import amstrax as ax

common_opts = dict(
    # register_all=[ax.pulse_processing,
    #               ax.peak_processing,
    #               ax.event_processing],
    register=[ax.DAQReader],
    store_run_fields=(
        'name', 'number',
        'start', 'end', 'livetime',
        'processing_status',
        'tags'),
    check_available=('raw_records_v1730',
                     'raw_records_v1724',
                     ),
    free_options=('live_data_dir',),
)

# xamsl and xams are too similar
xams_little_common_config = dict(
    live_data_dir='/data/xenon/xamsl/live_data',
    n_tpc_pmts=4,
    channel_map=immutabledict(
        v1730=(0, 2),
        v1724=(2, 4),
        aqmon=(40, 41),  # register strax deadtime
    ))

xams_common_config = dict(
    live_data_dir='/data/xenon/xams/live_data',  # doesn't work yet
    n_tpc_pmts=16,
    channel_map=immutabledict(
        # NB! Not the same as XAMSL
        v1724=(0, 8),
        v1730=(8, 16),
        aqmon=(40, 41),  # register strax deadtime
    ))


[docs]def xams(*args, **kwargs): if '_detector' in kwargs: raise ValueError('Don\'t specifify _detector!') mongo_kwargs = dict(mongo_collname='runs_gas', runid_field='number', mongo_dbname='run', ) st = _xams_xamsl_context(*args, **kwargs, _detector='xamsl', mongo_kwargs=mongo_kwargs) st.set_config(xams_common_config) return st
[docs]def xams_little(*args, **kwargs): if '_detector' in kwargs: raise ValueError('Don\'t specifify _detector!') mongo_kwargs = dict(mongo_collname='runs_new', runid_field='number', mongo_dbname='run', ) st = _xams_xamsl_context(*args, **kwargs, _detector='xamsl', mongo_kwargs=mongo_kwargs) st.set_config(xams_little_common_config) return st
def _xams_xamsl_context( output_folder='./amstrax_data', raw_data_folder='/data/xenon/{detector}/raw/', processed_data_folder='/data/xenon/{detector}/processed/', _detector='xams', init_rundb=True, mongo_kwargs: dict = None ): st = strax.Context(**common_opts, forbid_creation_of=ax.DAQReader.provides, ) raw_data_folder = raw_data_folder.format(detector=_detector) processed_data_folder = processed_data_folder.format(detector=_detector) for p in [raw_data_folder, processed_data_folder]: if not os.path.exists(p): UserWarning(f'Context for {_detector}, folder {p} does not exist?!') st.storage = [] if init_rundb: if mongo_kwargs is None: raise RuntimeError('You need to provide mongo-kwargs!') st.storage = [ax.RunDB( **mongo_kwargs, provide_run_metadata=True, )] st.storage += [ strax.DataDirectory(raw_data_folder, provide_run_metadata=False, take_only=ax.DAQReader.provides, deep_scan=False, readonly=True), strax.DataDirectory(processed_data_folder, provide_run_metadata=False, deep_scan=False, readonly=True), strax.DataDirectory(output_folder), ] print(st.storage) return st
[docs]def amstrax_gas_test_analysis(): """Return strax test for analysis of Xams gas test data""" UserWarning("Unsure if this context is complete and/or working") return strax.Context( storage=[ ax.RunDB( mongo_url=f'mongodb://{os.environ["user"]}:{os.environ["password"]}@127.0.0.1:27017/admin', mongo_collname='runs_gas', runid_field='number', mongo_dbname='run'), strax.DataDirectory('/data/xenon/xams/strax_processed_gas/', provide_run_metadata=False, deep_scan=False, readonly=True), strax.DataDirectory('/data/xenon/xams/strax_processed_peaks/', provide_run_metadata=False, deep_scan=False, readonly=False, )], forbid_creation_of='raw_records', **common_opts, )
[docs]def amstrax_gas_test_analysis_alt_baseline(): """Return strax test for analysis of Xams gas test data""" UserWarning("Unsure if this context is complete and/or working") return strax.Context( storage=[ strax.DataDirectory('/data/xenon/xams/strax_processed_gas/', provide_run_metadata=False, deep_scan=False, readonly=False), strax.DataDirectory('/data/xenon/xams/strax_processed_peaks/', provide_run_metadata=False, deep_scan=False, readonly=True, )], forbid_creation_of='raw_records', register_all=[ax.daqreader, ax.pulse_processing_alt_baseline], store_run_fields=( 'name', 'number', 'start', 'end', 'livetime', 'tags'), )
[docs]def amstrax_run10_analysis(output_folder='./strax_data'): """Return strax test for analysis of Xams gas test data""" UserWarning("Unsure if this context is complete and/or working") return strax.Context( storage=[ strax.DataDirectory(f'{output_folder}', provide_run_metadata=False, deep_scan=False, readonly=False), ], config=dict(**xams_common_config), register=ax.RecordsFromPax, **common_opts )
[docs]def context_for_daq_reader(st: strax.Context, run_id: str, runs_col_kwargs: dict = None, run_doc: dict = None, check_exists=True, ): """ Given a context and run_id, change the options such that we can process the live data. IMPORTANT: After setting the context, we specify the location of the live-data for a single run. This means you CANNOT re-use this context! Therefore, if you want to process data, you should start a new context if you want to process another run starting from the live data :param st: Context to change :param run_id: the run_id of the run that should be processed :param runs_col_kwargs: Optional options (kwargs) for starting the run-collection, see `get_mongo_collection` :param run_doc: Optional document associated with this run-id. :return: Context ready to start processing <run_id> with from the live-data """ if check_exists and _check_raw_records_exists(st, run_id): raise ValueError(f'raw data is stored for {run_id} disable check by ' f'setting "check_exists=False"') if runs_col_kwargs is None: runs_col_kwargs = {} if run_doc is None: run_col = ax.get_mongo_collection(**runs_col_kwargs) run_doc = run_col.find_one({'number': int(run_id)}) daq_config = run_doc['daq_config'] live_dir = st.config['live_data_dir'] input_dir = os.path.join(live_dir, run_id) if not os.path.exists(input_dir): raise FileNotFoundError(f'No path at {input_dir}') st.set_context_config(dict(forbid_creation_of=tuple())) st.set_config( {'readout_threads': daq_config['processing_threads'], 'daq_input_dir': input_dir, 'record_length': daq_config['strax_fragment_payload_bytes'] // 2, 'max_digitizer_sampling_time': 10, 'run_start_time': run_doc['start'].replace(tzinfo=timezone.utc).timestamp(), 'daq_chunk_duration': int(daq_config['strax_chunk_length'] * 1e9), 'daq_overlap_chunk_duration': int(daq_config['strax_chunk_overlap'] * 1e9), 'compressor': daq_config.get('compressor', 'lz4') }) UserWarning(f'You changed the context for {run_id}. Do not process any other run!') return st
def _check_raw_records_exists(st: strax.Context, run_id: str) -> bool: for plugin_name in st._plugin_class_registry.keys(): if 'raw' in plugin_name: if st.is_stored(run_id, plugin_name): return True return False