Source code for amstrax.plugins.pax_interface

"""Convert pax .zip files to flat records format
"""
import glob
import os

import numba
import numpy as np
import strax
from straxen.plugins.pax_interface import records_needed

export, __all__ = strax.exporter()


[docs]@export def pax_to_records(input_filename, samples_per_record=strax.DEFAULT_RECORD_LENGTH, events_per_chunk=10, dt=10): """ Same as straxen.pax_interface apart from dt change Return pulse records array from pax zip input_filename This only works if you have pax installed in your strax environment, which is somewhat tricky. """ # Monkeypatch matplotlib so pax is importable # See https://github.com/XENON1T/pax/pull/734 import matplotlib matplotlib._cntr = None from pax import core # Pax is not a dependency mypax = core.Processor('XENON1T', config_dict=dict( pax=dict( look_for_config_in_runs_db=False, plugin_group_names=['input'], encoder_plugin=None, input_name=input_filename), # Fast startup: skip loading big maps WaveformSimulator=dict( s1_light_yield_map='placeholder_map.json', s2_light_yield_map='placeholder_map.json', s1_patterns_file=None, s2_patterns_file=None))) print(f"Starting conversion, {events_per_chunk} evt/chunk") results = [] def finish_results(): nonlocal results records = np.concatenate(results) # In strax data, records are always stored # sorted, baselined and integrated records = strax.sort_by_time(records) print("Returning %d records" % len(records)) results = [] return records for event in mypax.get_events(): event = mypax.process_event(event) if not len(event.pulses): # Triggerless pax data contains many empty events # at the end. With the fixed events per chunk setting # this can lead to empty files, which confuses strax. continue pulse_lengths = np.array([p.length for p in event.pulses]) n_records_tot = records_needed(pulse_lengths, samples_per_record).sum() records = np.zeros(n_records_tot, dtype=strax.raw_record_dtype(samples_per_record)) output_record_index = 0 # Record offset in data for p in event.pulses: n_records = records_needed(p.length, samples_per_record) for rec_i in range(n_records): r = records[output_record_index] r['dt'] = dt r['time'] = (event.start_time + p.left * dt + rec_i * samples_per_record * dt) r['channel'] = p.channel r['pulse_length'] = p.length r['record_i'] = rec_i # How much are we storing in this record? if rec_i != n_records - 1: # There's more chunks coming, so we store a full chunk n_store = samples_per_record assert p.length > samples_per_record * (rec_i + 1) else: # Just enough to store the rest of the data # Note it's not p.length % samples_per_record!!! # (that would be zero if we have to store a full record) n_store = p.length - samples_per_record * rec_i assert 0 <= n_store <= samples_per_record r['length'] = n_store offset = rec_i * samples_per_record r['data'][:n_store] = p.raw_data[offset:offset + n_store] output_record_index += 1 results.append(records) if len(results) >= events_per_chunk: yield finish_results() mypax.shutdown() if len(results): y = finish_results() if len(y): yield y
def pax_to_records_zle(input_filename, samples_per_record=strax.DEFAULT_RECORD_LENGTH, events_per_chunk=10, dt=10): """Return pulse records array from pax zip input_filename This only works if you have pax installed in your strax environment, which is somewhat tricky. """ # Monkeypatch matplotlib so pax is importable # See https://github.com/XENON1T/pax/pull/734 import matplotlib matplotlib._cntr = None from pax import core # Pax is not a dependency mypax = core.Processor('XENON1T', config_dict=dict( pax=dict( look_for_config_in_runs_db=False, plugin_group_names=['input'], encoder_plugin=None, input_name=input_filename), # Fast startup: skip loading big maps WaveformSimulator=dict( s1_light_yield_map='placeholder_map.json', s2_light_yield_map='placeholder_map.json', s1_patterns_file=None, s2_patterns_file=None))) print(f"Starting conversion, {events_per_chunk} evt/chunk") results = [] def finish_results(): nonlocal results records = np.concatenate(results) # In strax data, records are always stored # sorted, baselined and integrated records = strax.sort_by_time(records) print("Returning %d records" % len(records)) results = [] return records for event in mypax.get_events(): event = mypax.process_event(event) if not len(event.pulses): # Triggerless pax data contains many empty events # at the end. With the fixed events per chunk setting # this can lead to empty files, which confuses strax. continue pulse_lengths = np.array([p.length for p in event.pulses]) output_record_index = 0 # Record offset in data for p in event.pulses: for left, right, data in find_intervals(p): pulse_length = right - left + 1 records_needed = int(np.ceil(pulse_length / samples_per_record)) records = np.zeros(records_needed, dtype=strax.raw_record_dtype(samples_per_record)) records['channel'] = p.channel records['dt'] = dt records['time'] = event.start_time + p.left * dt + dt * ( left + samples_per_record * np.arange(records_needed)) records['length'] = [min(pulse_length, samples_per_record * (i + 1)) - samples_per_record * i for i in range(records_needed)] records['pulse_length'] = pulse_length records['record_i'] = np.arange(records_needed) records['data'] = np.pad(data, (0, records_needed * samples_per_record - pulse_length), 'constant').reshape((-1, samples_per_record)) output_record_index += records_needed results.append(records) if len(results) >= events_per_chunk: yield finish_results() mypax.shutdown() if len(results): y = finish_results() if len(y): yield y @numba.jit(numba.int32(numba.int16[:], numba.int64, numba.int64, numba.int64[:, :]), nopython=True) def find_intervals_below_threshold(w, threshold, holdoff, result_buffer): """Fills result_buffer with l, r bounds of intervals in w < threshold. :param w: Waveform to do hitfinding in :param threshold: Threshold for including an interval :param result_buffer: numpy N*2 array of ints, will be filled by function. if more than N intervals are found, none past the first N will be processed. :returns : number of intervals processed Boundary indices are inclusive, i.e. the right boundary is the last index which was < threshold """ result_buffer_size = len(result_buffer) last_index_in_w = len(w) - 1 in_interval = False current_interval = 0 current_interval_start = -1 current_interval_end = -1 for i, x in enumerate(w): if x < threshold: if not in_interval: # Start of an interval in_interval = True current_interval_start = i current_interval_end = i if ((i == last_index_in_w and in_interval) or (x >= threshold and i >= current_interval_end + holdoff and in_interval)): # End of the current interval in_interval = False # Add bounds to result buffer result_buffer[current_interval, 0] = current_interval_start result_buffer[current_interval, 1] = current_interval_end current_interval += 1 if current_interval == result_buffer_size: result_buffer[current_interval, 1] = len(w) - 1 n_intervals = current_interval # No +1, as current_interval was incremented also when the last interval closed return n_intervals def find_intervals(pulse): data = pulse.raw_data zle_intervals_buffer = -1 * np.ones((50000, 2), dtype=np.int64) # For simulated data taking reference baseline as baseline # Operating directly on digitized downward waveform threshold = int(np.median(data) - 2 * np.std(data)) n_itvs_found = find_intervals_below_threshold( data, threshold=threshold, holdoff=101, result_buffer=zle_intervals_buffer, ) itvs_to_encode = zle_intervals_buffer[:n_itvs_found] itvs_to_encode[:, 0] -= 50 itvs_to_encode[:, 1] += 50 itvs_to_encode = np.clip(itvs_to_encode, 0, len(data) - 1) # Land trigger window on even numbers itvs_to_encode[:, 0] = np.ceil(itvs_to_encode[:, 0] / 2.0) * 2 itvs_to_encode[:, 1] = np.floor(itvs_to_encode[:, 1] / 2.0) * 2 for itv in itvs_to_encode: print(itv) yield itv[0], itv[1], data[itv[0]:itv[1] + 1]
[docs]@export @strax.takes_config( strax.Option('pax_raw_dir', default='/data/xenon/raw', track=False, help="Directory with raw pax datasets"), strax.Option('stop_after_zips', default=0, track=False, help="Convert only this many zip files. 0 = all."), strax.Option('events_per_chunk', default=50, track=False, help="Number of events to yield per chunk"), strax.Option('samples_per_record', default=strax.DEFAULT_RECORD_LENGTH, track=False, help="Number of samples per record") ) class RecordsFromPax(strax.Plugin): provides = 'raw_records' data_kind = 'raw_records' compressor = 'zstd' depends_on = tuple() parallel = False rechunk_on_save = False __version__ = '0.0.2'
[docs] def infer_dtype(self): return strax.raw_record_dtype(self.config['samples_per_record'])
[docs] def iter(self, *args, **kwargs): if not os.path.exists(self.config['pax_raw_dir']): raise FileNotFoundError(self.config['pax_raw_dir']) input_dir = os.path.join(self.config['pax_raw_dir'], self.run_id) pax_files = sorted(glob.glob(input_dir + '/XAMS*.zip')) pax_sizes = np.array([os.path.getsize(x) for x in pax_files]) print(f"Found {len(pax_files)} files, {pax_sizes.sum() / 1e9:.2f} GB") last_endtime = 0 if self.run_id[-4:] == '1730': self.config['dt'] = 2 else: self.config['dt'] = 10 for file_i, in_fn in enumerate(pax_files): if (self.config['stop_after_zips'] and file_i >= self.config['stop_after_zips']): break for records in pax_to_records_zle( in_fn, samples_per_record=self.config['samples_per_record'], events_per_chunk=self.config['events_per_chunk'], dt=self.config['dt']): if not len(records): continue if last_endtime == 0: last_endtime = records[0]['time'] new_endtime = strax.endtime(records).max() yield self.chunk(start=last_endtime, end=new_endtime, data=records) last_endtime = new_endtime