import os
from datetime import timezone
import strax
from immutabledict import immutabledict
import amstrax as ax
# Configuration
CONFIG = {"DEFAULT_DETECTOR": "xams", "DEFAULT_RUNCOLNAME": "run", "DEFAULT_COLLECTION": "runs_gas"}
PATHS_TO_REGISTER = [
# The path for raw records on nikhef
"/data/xenon/xams_v2/xams_raw_records",
# The path for processed data on nikhef
"/data/xenon/xams_v2/xams_processed",
# The path for raw records on the xams server
"/home/xams/data/xams_processed",
]
COMMON_OPT_XAMS = dict(
register_all=[],
register=[
ax.DAQReader,
ax.PulseProcessing,
# Peaks
ax.Peaks,
ax.PeakBasics,
ax.PeakPositions,
# Events
ax.Events,
ax.EventBasics,
ax.EventPositions,
ax.EventCoincidences,
ax.CorrectedAreas,
ax.EventInfo,
ax.EventWaveform,
ax.EventAreaPerChannel,
# External PMT plugins
ax.PulseProcessingEXT,
ax.PeaksEXT,
ax.PeakBasicsEXT,
# SiPMT plugins
ax.PulseProcessingSiPM,
ax.PeaksSiPM,
ax.PeakBasicsSiPM,
# Coincidences
ax.PeakCoincidences,
# LED plugins not default
# ax.RecordsLED,
# ax.LEDCalibration,
],
store_run_fields=("name", "number", "start", "end", "livetime", "processing_status", "tags"),
check_available=(),
free_options=("live_data_dir",),
)
XAMS_COMMON_CONFIG = dict(
n_tpc_pmts=5,
# Fallback-only channel map.
# Authoritative per-run channel maps should come from rundoc
# (xams_bookkeeping.channel_map, or daq_config.channel_map as legacy fallback)
# and override this default at runtime.
channel_map=immutabledict(
bottom=(0, 0),
top=(1, 4),
external=(5,5),
sipm=(6, 6),
aqmon=(40, 40), # register strax deadtime
),
)
[docs]def xams(
output_folder="./strax_data",
init_rundb=True,
mongo_kwargs=dict(
mongo_collname=CONFIG["DEFAULT_COLLECTION"], mongo_dbname=CONFIG["DEFAULT_RUNCOLNAME"], runid_field="number"
),
corrections_version=None,
*args,
**kwargs,
):
st = strax.Context(
**COMMON_OPT_XAMS,
forbid_creation_of=ax.DAQReader.provides,
# output_folder it will be set again later, but so we do not create a folder /strax_data everywhere
storage=output_folder,
)
st.set_config(XAMS_COMMON_CONFIG)
st.storage = []
if init_rundb:
if mongo_kwargs is None:
raise RuntimeError("You need to provide mongo-kwargs!")
st.storage = [
ax.RunDB(
**mongo_kwargs,
provide_run_metadata=True,
)
]
for path in PATHS_TO_REGISTER:
if os.path.exists(path):
st.storage += [strax.DataDirectory(path, provide_run_metadata=False, deep_scan=False, readonly=True)]
else:
# just means we are in another ho
pass
st.storage += [
strax.DataDirectory(output_folder),
]
if corrections_version is not None:
apply_global_correction_version(st, corrections_version)
return st
[docs]def context_for_daq_reader(
st: strax.Context,
run_id: str,
detector: str = "xams",
runs_col_kwargs: dict = None,
run_doc: dict = None,
check_exists=True,
):
"""
Given a context and run_id, change the options such that we can
process the live data.
IMPORTANT:
After setting the context, we specify the location of the live-data
for a single run. This means you CANNOT re-use this context!
Therefore, if you want to process data, you should start a new
context if you want to process another run starting from the live
data
:param st: Context to change
:param run_id: the run_id of the run that should be processed
:param runs_col_kwargs: Optional options (kwargs) for starting the
run-collection, see `get_mongo_collection`
:param run_doc: Optional document associated with this run-id.
:return: Context ready to start processing <run_id> with from the
live-data
"""
if check_exists and _check_raw_records_exists(st, run_id):
raise ValueError(f'raw data is stored for {run_id} disable check by setting "check_exists=False"')
run_doc = _get_run_doc_for_daq_reader(run_doc=run_doc, run_id=run_id, detector=detector)
daq_config = run_doc["daq_config"]
_set_optional_channel_map(st=st, run_doc=run_doc)
input_dir = _resolve_input_dir_for_daq_reader(st=st, run_id=run_id, daq_config=daq_config)
st.set_context_config(dict(forbid_creation_of=tuple()))
st.set_config(_build_daq_reader_config(run_doc=run_doc, daq_config=daq_config, input_dir=input_dir))
UserWarning(f'You changed the context for {run_id}. Do not process any other run!')
return st
def _get_run_doc_for_daq_reader(run_doc: dict, run_id: str, detector: str) -> dict:
if run_doc is not None:
return run_doc
run_col = ax.get_mongo_collection(detector)
return run_col.find_one({"number": int(run_id)})
def _normalize_channel_map(value):
if not isinstance(value, dict):
return None
out = {}
for key, val in value.items():
if isinstance(val, (list, tuple)) and len(val) == 2:
out[str(key)] = (int(val[0]), int(val[1]))
return immutabledict(out) if out else None
def _resolve_run_channel_map(run_doc: dict):
xbk = run_doc.get("xams_bookkeeping") if isinstance(run_doc.get("xams_bookkeeping"), dict) else {}
run_channel_map = _normalize_channel_map(xbk.get("channel_map"))
if run_channel_map is not None:
return run_channel_map
daq_cfg = run_doc.get("daq_config") if isinstance(run_doc.get("daq_config"), dict) else {}
return _normalize_channel_map(daq_cfg.get("channel_map"))
def _set_optional_channel_map(st: strax.Context, run_doc: dict) -> None:
run_channel_map = _resolve_run_channel_map(run_doc=run_doc)
if run_channel_map is not None:
st.set_config({"channel_map": run_channel_map})
def _resolve_input_dir_for_daq_reader(st: strax.Context, run_id: str, daq_config: dict) -> str:
live_dir = daq_config["strax_output_path"]
if "live_data_dir" in st.config:
live_dir = st.config["live_data_dir"]
UserWarning(f"live_data_dir is overwritten to {live_dir}")
input_dir = os.path.join(live_dir, run_id)
if not os.path.exists(input_dir):
raise FileNotFoundError(f"No path at {input_dir}")
return input_dir
def _build_daq_reader_config(run_doc: dict, daq_config: dict, input_dir: str) -> dict:
return {
'readout_threads': daq_config['processing_threads'],
'daq_input_dir': input_dir,
'record_length': daq_config['strax_fragment_payload_bytes'] // 2,
'max_digitizer_sampling_time': 10,
'run_start_time': run_doc['start'].replace(tzinfo=timezone.utc).timestamp(),
'daq_chunk_duration': int(daq_config['strax_chunk_length'] * 1e9),
'daq_overlap_chunk_duration': int(daq_config['strax_chunk_overlap'] * 1e9),
'compressor': daq_config.get('compressor', 'lz4'),
}
[docs]def xams_led(**kwargs):
st = xams(**kwargs)
st.set_context_config({"check_available": ("raw_records", "raw_records_sipm", "records_led", "led_calibration")})
# Return a new context with only raw_records and led_calibration registered
st = st.new_context(replace=True, config=st.config, storage=st.storage, **st.context_config)
st.register([ax.DAQReader, ax.RecordsLED, ax.LEDCalibration])
return st
def _check_raw_records_exists(st: strax.Context, run_id: str) -> bool:
for plugin_name in st._plugin_class_registry.keys():
if "raw" in plugin_name:
if st.is_stored(run_id, plugin_name):
return True
return False
[docs]def apply_global_correction_version(context: strax.Context, global_version: str) -> None:
"""
Set all the relevant correction variables based on the specified global version.
Only for testing purposes, you can add a github branch to the version by adding '@branchname' to the version.
:param global_version: A specific version (e.g., 'v0') to apply corrections.
"""
# Load the global corrections file (e.g., 'global_v0.json')
if "@" in global_version:
global_version, github_branch = global_version.split("@")
else:
github_branch = "master"
global_corrections_file = f"_global_{global_version}.json"
global_corrections = ax.get_correction(global_corrections_file, branch=github_branch)
# Iterate over all the relevant corrections specified in the global file
xams_config = {}
for correction_key, correction_value in global_corrections.items():
if correction_value is None:
warnings.warn(f"No correction file for {correction_key} in version {global_version}")
continue
# we need to check first if it's a string and if it ends with .json
if isinstance(correction_value, str) and correction_value.endswith(".json"):
# If the correction is a file, set the configuration to point to the correct file
# Set the configuration to point to the correct file for each key (e.g., elife, gain)
add_github_branch = "&github_branch=" + github_branch if github_branch is not None else ""
config_value = f"file://{correction_key}?filename={correction_value}{add_github_branch}"
xams_config[correction_key] = config_value
else:
# If the correction is a value, just set the value
xams_config[correction_key] = correction_value
# Set the full configuration in the context
context.set_config(xams_config)
# Add some logging or print statements if needed
print(f"Applied XAMS version {global_version} with corrections: {xams_config}")
return context