import glob
import warnings
import numpy as np
import strax
import straxen
from immutabledict import immutabledict
from straxen.plugins.daqreader import split_channel_ranges
export, __all__ = strax.exporter()
__all__ += ['ARTIFICIAL_DEADTIME_CHANNEL']
ARTIFICIAL_DEADTIME_CHANNEL = 40
class ArtificialDeadtimeInserted(UserWarning):
pass
[docs]@export
class DAQReader(straxen.DAQReader):
"""
Read the XAMS DAQ-live_data from redax and split it to the
appropriate raw_record data-types based on the channel-map.
Does nothing whatsoever to the live_data; not even baselining.
Provides:
- raw_records_v1724, sampled from the V1724 digitizer with sampling resolution = 10ns
- raw_records_v1730, sampled from the V1730 digitizer with sampling resolution = 2ns
- raw_records_aqmon, actually empty unless we need some strax-deadtime
"""
provides = (
'raw_records_v1724',
'raw_records_v1730',
'raw_records_aqmon',
)
depends_on = tuple()
data_kind = immutabledict(zip(provides, provides))
parallel = 'process'
rechunk_on_save = False
# never change the version!
__version__ = '0.0.0'
compressor = 'zstd'
[docs] def infer_dtype(self):
if not self.multi_output:
return strax.raw_record_dtype(
samples_per_record=self.config["record_length"])
return {
d: strax.raw_record_dtype(
samples_per_record=self.config["record_length"])
for d in self.provides}
def _load_chunk(self, path, start, end, kind='central'):
first_provides = self.provides[0]
records = [
strax.load_file(
fn,
compressor=self.config["daq_compressor"],
dtype=self.dtype_for(first_provides))
for fn in sorted(glob.glob(f'{path}/*'))]
records = np.concatenate(records)
records = strax.sort_by_time(records)
first_start, last_start, last_end = None, None, None
if len(records):
first_start, last_start = records[0]['time'], records[-1]['time']
# Records are sorted by (start)time and are of variable length.
# Their end-times can differ. In the most pessimistic case we have
# to look back one record length for each channel.
tot_channels = np.sum([np.diff(x) + 1 for x in
self.config['channel_map'].values()])
look_n_samples = self.config["record_length"] * tot_channels
last_end = strax.endtime(records[-look_n_samples:]).max()
if first_start < start or last_start >= end:
raise ValueError(
f"Bad data from DAQ: chunk {path} should contain data "
f"that starts in [{start}, {end}), but we see start times "
f"ranging from {first_start} to {last_start}.")
if kind == 'central':
result = records
break_time = None
else:
# Find a time at which we can safely partition the data.
min_gap = self.config['safe_break_in_pulses']
if not len(records) or last_end + min_gap < end:
# There is enough room at the end of the data
break_time = end - min_gap
result = records if kind == 'post' else records[:0]
else:
# Let's hope there is some quiet time in the middle
try:
result, break_time = strax.from_break(
records,
safe_break=min_gap,
# Records from the last chunk can extend as far as:
not_before=(start
+ self.config['record_length'] * self.dt_max),
left=kind == 'post',
tolerant=False)
except strax.NoBreakFound:
# We still have to break somewhere, but this can involve
# throwing away data.
# Let's do it at the end of the chunk
# satisfactory gap
break_time = end - min_gap
# Mark the region where data /might/ be removed with
# artificial deadtime.
dead_time_start = (
break_time - self.config['record_length'] * self.dt_max)
warnings.warn(
f"Data in {path} is so dense that no {min_gap} "
f"ns break exists: data loss inevitable. "
f"Inserting artificial deadtime between "
f"{dead_time_start} and {end}.",
ArtificialDeadtimeInserted)
if kind == 'pre':
# Give the artificial deadtime past the break
result = self._artificial_dead_time(
start=break_time, end=end, dt=self.dt_max)
else:
# Remove data that would stick out
result = records[strax.endtime(records) <= break_time]
# Add the artificial deadtime until the break
result = strax.sort_by_time(
np.concatenate([result,
self._artificial_dead_time(
start=dead_time_start,
end=break_time, dt=self.dt_max)]))
return result, break_time
def _artificial_dead_time(self, start, end, dt):
return strax.dict_to_rec(
dict(time=[start],
length=[(end - start) // dt],
dt=[dt],
channel=[ARTIFICIAL_DEADTIME_CHANNEL]),
self.dtype_for('raw_records'))
def _path(self, chunk_i):
return self.config["daq_input_dir"] + f'/{chunk_i:06d}'
[docs] def compute(self, chunk_i):
dt_central = self.config['daq_chunk_duration']
dt_overlap = self.config['daq_overlap_chunk_duration']
t_start = chunk_i * (dt_central + dt_overlap)
t_end = t_start + dt_central
pre, current, post = self._chunk_paths(chunk_i)
r_pre, r_post = None, None
break_pre, break_post = t_start, t_end
if pre:
if chunk_i == 0:
warnings.warn(
f"DAQ is being sloppy: there should be no pre dir {pre} "
f"for chunk 0. We're ignoring it.",
UserWarning)
else:
r_pre, break_pre = self._load_chunk(
path=pre,
start=t_start - dt_overlap,
end=t_start,
kind='pre')
r_main, _ = self._load_chunk(
path=current,
start=t_start,
end=t_end,
kind='central')
if post:
r_post, break_post = self._load_chunk(
path=post,
start=t_end,
end=t_end + dt_overlap,
kind='post')
# Concatenate the result.
records = np.concatenate([
x for x in (r_pre, r_main, r_post)
if x is not None])
# Split records by channel
result_arrays = split_channel_ranges(
records,
np.asarray(list(self.config['channel_map'].values())))
del records
# Convert to strax chunks
result = dict()
for i, subd in enumerate(self.config['channel_map']):
if len(result_arrays[i]):
# dt may differ per subdetector
dt = result_arrays[i]['dt'][0]
# Convert time to time in ns since unix epoch.
# Ensure the offset is a whole digitizer sample
result_arrays[i]["time"] += dt * (self.t0 // dt)
# Ignore data from the 'blank' channels, corresponding to
# channels that have nothing connected
if subd.endswith('blank'):
continue
result_name = 'raw_records'
result_name += '_' + subd
result[result_name] = self.chunk(
start=self.t0 + break_pre,
end=self.t0 + break_post,
data=result_arrays[i],
data_type=result_name)
print(f"Read chunk {chunk_i:06d} from DAQ")
for r in result.values():
# Print data rate / data type if any
if r._mbs() > 0:
print(f"\t{r}")
return result
[docs]@export
class Fake1TDAQReader(DAQReader):
provides = (
'raw_records',
'raw_records_diagnostic',
'raw_records_aqmon')
data_kind = immutabledict(zip(provides, provides))