import numba
import numpy as np
import strax
from amstrax.SiPMdata import *
export, __all__ = strax.exporter()
# These are also needed in peaklets, since hitfinding is repeated
HITFINDER_OPTIONS = tuple([
strax.Option(
'hit_min_amplitude',
default='pmt_commissioning_initial',
help='Minimum hit amplitude in ADC counts above baseline. '
'See straxen.hit_min_amplitude for options.'
)])
@strax.takes_config(
strax.Option('peak_gap_threshold', default=3000,
help="No hits for this many ns triggers a new peak"),
strax.Option('peak_left_extension', default=1000,
help="Include this many ns left of hits in peaks"),
strax.Option('peak_right_extension', default=1000,
help="Include this many ns right of hits in peaks"),
strax.Option('peak_min_area', default=1,
help="Minimum contributing PMTs needed to define a peak"),
strax.Option('peak_min_pmts', default=1,
help="Minimum contributing PMTs needed to define a peak"),
strax.Option('single_channel_peaks', default=False,
help='Whether single-channel peaks should be reported'),
strax.Option('peak_split_min_height', default=25,
help="Minimum height in PE above a local sum waveform"
"minimum, on either side, to trigger a split"),
strax.Option('peak_split_min_ratio', default=4,
help="Minimum ratio between local sum waveform"
"minimum and maxima on either side, to trigger a split"),
strax.Option('diagnose_sorting', track=False, default=False,
help="Enable runtime checks for sorting and disjointness"),
strax.Option('pmt_channel', default=0,
help="PMT channel for splitting pmt and sipms"), )
class Peaks(strax.Plugin):
depends_on = ('records',)
# data_kind = dict(peaks='peaks')
data_kind = 'peaks'
parallel = 'process'
provides = ('peaks')
rechunk_on_save = True
__version__ = '0.1.50'
# dtype = dict(peaks = strax.peak_dtype(n_channels=8))
dtype = strax.peak_dtype(n_channels=8)
def compute(self, records):
# Remove hits in zero-gain channels
# they should not affect the clustering!
r = records
# gain = np.array([0.324e6,0.323e6,1,0.309e6,0.312e6,0.306e6,0.319e6,0.326e6])
# sample_duration*digitizer_voltage_range/(2**digitizer_bits*pmt_circuit_load_resistor*total_amplification*e)
# total_amplification = gain * factor
# self.to_pe = 2e-9 * 2 / (2**13 * 50 * gain * 10 * 1.602e-19)
self.to_pe = np.ones(16)
hits = strax.find_hits(r)
hits = strax.sort_by_time(hits)
# Rewrite to just peaks/hits
peaks = strax.find_peaks(
hits, self.to_pe,
gap_threshold=self.config['peak_gap_threshold'],
left_extension=self.config['peak_left_extension'],
right_extension=self.config['peak_right_extension'],
min_area=self.config['peak_min_area'],
min_channels=self.config['peak_min_pmts'],
# min_channels=1,
result_dtype=strax.peak_dtype(n_channels=8)
# result_dtype=self.dtype
)
strax.sum_waveform(peaks, r, self.to_pe)
peaks = strax.split_peaks(
peaks, r, self.to_pe,
min_height=self.config['peak_split_min_height'],
min_ratio=self.config['peak_split_min_ratio'])
strax.compute_widths(peaks)
return peaks
[docs]@export
@strax.takes_config(
strax.Option(
'hit_threshold',
default=10,
help='Hitfinder threshold in ADC counts above baseline'),
# PMT pulse processing options
strax.Option(
'save_outside_hits',
default=(3, 20),
help='Save (left, right) samples besides hits; cut the rest'),
)
class Hits(strax.Plugin):
depends_on = 'records'
data_kind = 'peaks'
parallel = 'False'
__version__ = '0.0.2'
rechunk_on_save = False
dtype = strax.hit_dtype
[docs] def compute(self, records):
hits = strax.find_hits(records, threshold=self.config['hit_threshold'])
return hits
# For n_competing, which is temporarily added to PeakBasics
[docs]@export
@strax.takes_config(
strax.Option('min_area_fraction', default=0.5,
help='The area of competing peaks must be at least '
'this fraction of that of the considered peak'),
strax.Option('nearby_window', default=int(1e6),
help='Peaks starting within this time window (on either side)'
'in ns count as nearby.'),
)
class PeakBasics(strax.Plugin):
provides = ('peak_basics',)
depends_on = ('peaks')
data_kind = ('peaks')
parallel = 'False'
rechunk_on_save = False
__version__ = '0.1.7'
dtype = [
(('Start time of the peak (ns since unix epoch)',
'time'), np.int64),
(('End time of the peak (ns since unix epoch)',
'endtime'), np.int64),
(('Peak integral in PE',
'area'), np.float32),
(('Number of PMTs contributing to the peak',
'n_channels'), np.int16),
(('PMT number which contributes the most PE',
'max_pmt'), np.int16),
(('Area of signal in the largest-contributing PMT (PE)',
'max_pmt_area'), np.int32),
(('Width (in ns) of the central 50% area of the peak',
'range_50p_area'), np.float32),
(('Fraction of area seen by the top array',
'area_fraction_top'), np.float32),
(('Length of the peak waveform in samples',
'length'), np.int32),
(('Time resolution of the peak waveform in ns',
'dt'), np.int16),
('n_competing', np.int32, # temporarily due to chunking issues
'Number of nearby larger or slightly smaller peaks')
]
[docs] def compute(self, peaks):
p = peaks
p = strax.sort_by_time(p)
r = np.zeros(len(p), self.dtype)
for q in 'time length dt area'.split():
r[q] = p[q]
r['endtime'] = p['time'] + p['dt'] * p['length']
r['n_channels'] = (p['area_per_channel'] > 0).sum(axis=1)
r['range_50p_area'] = p['width'][:, 5]
r['max_pmt'] = np.argmax(p['area_per_channel'], axis=1)
r['max_pmt_area'] = np.max(p['area_per_channel'], axis=1)
# area_top = p['area_per_channel'][:, :8].sum(axis=1)
area_top = p['area_per_channel'][:, 1:2].sum(axis=1) # top pmt in ch 1
# Negative-area peaks get 0 AFT - TODO why not NaN?
m = p['area'] > 0
r['area_fraction_top'][m] = area_top[m] / p['area'][m]
# n_competing temporarily due to chunking issues
r['n_competing'] = self.find_n_competing(
peaks,
window=self.config['nearby_window'],
fraction=self.config['min_area_fraction'])
return r
# n_competing
[docs] def get_window_size(self):
return 2 * self.config['nearby_window']
[docs] @staticmethod
@numba.jit(nopython=True, nogil=True, cache=False)
def find_n_competing(peaks, window, fraction):
n = len(peaks)
t = peaks['time']
a = peaks['area']
results = np.zeros(n, dtype=np.int16)
left_i = 0
right_i = 0
for i, peak in enumerate(peaks):
while t[left_i] + window < t[i] and left_i < n - 1:
left_i += 1
while t[right_i] - window < t[i] and right_i < n - 1:
right_i += 1
results[i] = np.sum(a[left_i:right_i + 1] > a[i] * fraction)
return results
[docs]@export
class PeakPositions(strax.Plugin):
depends_on = ('peaks', 'peak_classification')
rechunk_on_save = False
__version__ = '0.0.34' # .33 for LNLIKE
dtype = [
('xr', np.float32,
'Interaction x-position'),
('yr', np.float32,
'Interaction y-position'),
('r', np.float32,
'radial distance'),
('time', np.int64, 'Start time of the peak (ns since unix epoch)'),
('endtime', np.int64, 'End time of the peak (ns since unix epoch)')
]
[docs] def setup(self):
# z position of the in-plane SiPMs
z_plane = 10
# radius of the cylinder for SiPMs at the side
r_cylinder = 22
# radius of a SiPM - I assume circular SiPMs with a radius to make the area correspond to a 3x3mm2 square.
r_sipm = 1.6925
# build geometry
geo = GeoParameters(z_plane=z_plane, r_cylinder=r_cylinder, r_sipm=r_sipm)
sipm = SiPM(type="plane", position=[0, -15, z_plane], qeff=0.25)
geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[-13, -7.5, z_plane], qeff=0.25)
geo.add_sipm(sipm)
# sipm = SiPM(type="plane", position=[0, 15, z_plane], qeff=0.25)
# geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[13, -7.5, z_plane], qeff=0.25)
geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[-4, 0, z_plane], qeff=0.25)
geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[4, 0, z_plane], qeff=0.25)
geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[-13, 7.5, z_plane], qeff=0.25)
geo.add_sipm(sipm)
sipm = SiPM(type="plane", position=[13, 7.5, z_plane], qeff=0.25)
geo.add_sipm(sipm)
self.geo = geo
[docs] def compute(self, peaks):
result = np.empty(len(peaks), dtype=self.dtype)
if not len(peaks):
return result
for ix, p in enumerate(peaks):
if p['type'] != 2:
continue
# if [X] channel is not working
k = np.delete(p['area_per_channel'], [2])
for i, area in enumerate(k):
self.geo.sipms[i].set_number_of_hits(area)
# if all 8 channels are working
# for i, area in enumerate(p['area_per_channel']):
# self.geo.sipms[i].set_number_of_hits(area)
posrec = Reconstruction(self.geo)
pos = posrec.reconstruct_position('CHI2')
for key in ['xr', 'yr']:
result[key][ix] = pos[key]
for q in ['time', 'endtime']:
result[q] = p[q]
result['r'] = (result['xr'] ** 2 + result['yr'] ** 2) ** (1 / 2)
return result
[docs]@export
@strax.takes_config(
strax.Option('s1_max_width', default=60,
help="Maximum (IQR) width of S1s"),
strax.Option('s1_min_area', default=4,
help="Minimum area (PE) for S1s"),
strax.Option('s2_min_area', default=4,
help="Minimum area (PE) for S2s"),
strax.Option('s2_min_width', default=100,
help="Minimum width for S2s"))
class PeakClassification(strax.Plugin):
rechunk_on_save = False
__version__ = '0.0.4'
depends_on = ('peak_basics')
dtype = [
('type', np.int8, 'Classification of the peak.'),
('time', np.int64, 'Start time of the peak (ns since unix epoch)'),
('endtime', np.int64, 'End time of the peak (ns since unix epoch)')
]
parallel = True
[docs] def compute(self, peaks):
p = peaks
r = np.zeros(len(p), dtype=self.dtype)
is_s1 = p['area'] >= self.config['s1_min_area']
is_s1 &= p['range_50p_area'] < self.config['s1_max_width']
r['type'][is_s1] = 1
is_s2 = p['area'] > self.config['s2_min_area']
is_s2 &= p['range_50p_area'] > self.config['s2_min_width']
r['type'][is_s2] = 2
for q in ['time', 'endtime']:
r[q] = p[q]
return r
# n_competing experiencing re-chunking issues, temporarily added to PeakBasics return results - 1
# @export
# @strax.takes_config(
# strax.Option('min_area_fraction', default=0.5,
# help='The area of competing peaks must be at least '
# 'this fraction of that of the considered peak'),
# strax.Option('nearby_window', default=int(1e6),
# help='Peaks starting within this time window (on either side)'
# 'in ns count as nearby.'),
# )
# class NCompeting(strax.OverlapWindowPlugin): #from NCompetingTop
# depends_on = ('peak_basics',) #from peak_basics_top
# rechunk_on_save = False
# dtype = [
# ('n_competing', np.int32,
# 'Number of nearby larger or slightly smaller peaks'),
# ('time', np.int64, 'Start time of the peak (ns since unix epoch)'),
# ('endtime', np.int64, 'End time of the peak (ns since unix epoch)')
# ]
# __version__ = '0.0.27'
#
# def get_window_size(self):
# return 2 * self.config['nearby_window']
#
# def compute(self, peaks):
# results=np.zeros(len(peaks),dtype=self.dtype)
# results['time']=peaks['time']
# results['endtime']=peaks['endtime']
# results['n_competing']=self.find_n_competing(
# peaks,
# window=self.config['nearby_window'],
# fraction=self.config['min_area_fraction'])
# return results
#
# @staticmethod
# @numba.jit(nopython=True, nogil=True, cache=False)
# def find_n_competing(peaks, window, fraction):
# n = len(peaks)
# t = peaks['time']
# a = peaks['area']
# results = np.zeros(n, dtype=np.int16)
# left_i = 0
# right_i = 0
# for i, peak in enumerate(peaks):
# while t[left_i] + window < t[i] and left_i < n - 1:
# left_i += 1
# while t[right_i] - window < t[i] and right_i < n - 1:
# right_i += 1
# results[i] = np.sum(a[left_i:right_i + 1] > a[i] * fraction)
#
# return results