import os
import re
import socket
import typing
from typing import Union, Dict, Any
import configparser
import pymongo
import strax
from sshtunnel import SSHTunnelForwarder
from tqdm import tqdm
export, __all__ = strax.exporter()
# Configuration
CONFIG = {
'DEFAULT_DBNAME': 'admin',
'DEFAULT_DETECTOR': 'xams',
'DEFAULT_RUNCOLNAME': 'run',
'DEFAULT_COLLECTION': 'runs_gas'
}
def get_configs() -> Dict[str, Any]:
"""Get the configuration dictionary."""
result = {}
required_keys = ['DAQ_PASSWORD', 'DAQ_HOST', 'DAQ_USER', 'MONGO_USER', 'MONGO_PASSWORD', 'MONGO_PORT']
# I need to check if the config file exists, and then get the keys preferably from there otherwise from the environment variables and otherwise raise an error
config_file_path = os.path.join(os.getenv('HOME'), '.xams_config')
if not os.path.exists(config_file_path):
# raise a warning
print(f'Warning: could not find {config_file_path}. Will try to get the configuration from environment variables.')
# we will check in the environment variables
for key in required_keys:
if key in os.environ:
result[key] = os.environ[key]
else:
raise ValueError(f'Could not find {key} in environment variables or default configuration in ~/.xams_config')
else:
# we will check in the config file
config_file = configparser.ConfigParser()
config_file.read(config_file_path)
for key in required_keys:
if key in config_file['default']:
result[key] = config_file['default'][key]
elif key in os.environ:
result[key] = os.environ[key]
else:
raise ValueError(f'Could not find {key} in environment variables or default configuration in ~/.xams_config')
return result
def establish_ssh_tunnel(daq_host: str, daq_user: str, daq_password: str, mongo_port: str, secret_serving_port: Dict[str, Any]) -> int:
"""Establish an SSH tunnel and return the local bind port."""
port_key = f'{daq_host}_{daq_user}'
if port_key in secret_serving_port:
return secret_serving_port[port_key]
server = SSHTunnelForwarder(
daq_host,
ssh_username=daq_user,
ssh_password=daq_password,
remote_bind_address=('127.0.0.1', mongo_port),
)
server.start()
secret_serving_port[port_key] = server.local_bind_port
return server.local_bind_port
[docs]@export
def get_mongo_client(daq_host: str = "",
daq_user: str = "",
secret_serving_port: Dict[str, Any] = {}
) -> pymongo.MongoClient:
"""Get a MongoDB client."""
# get all the passwords from get_configs
configs = get_configs()
daq_password = configs['DAQ_PASSWORD']
daq_host = configs['DAQ_HOST']
daq_user = configs['DAQ_USER']
user = configs['MONGO_USER']
password = configs['MONGO_PASSWORD']
mongo_port = int(configs['MONGO_PORT'])
local_port = establish_ssh_tunnel(daq_host, daq_user, daq_password, mongo_port, secret_serving_port)
return pymongo.MongoClient(f'mongodb://{user}:{password}@127.0.0.1:{local_port}/{CONFIG["DEFAULT_DBNAME"]}')
[docs]@export
def get_mongo_collection(detector: str = CONFIG['DEFAULT_DETECTOR'],
runcolname: str = CONFIG['DEFAULT_RUNCOLNAME'],
**link_kwargs
) -> pymongo.collection.Collection:
"""Get a specific MongoDB collection based on the detector."""
client = get_mongo_client(**link_kwargs)
collections = {
'xams': 'runs_gas',
'xamsl': 'runs_new'
}
if detector not in collections:
raise NameError(f'detector {detector} is not a valid detector name.')
return client[runcolname][collections[detector]]
[docs]@export
class RunDB(strax.StorageFrontend):
"""Frontend that searches RunDB MongoDB for data.
Loads appropriate backends ranging from Files to S3.
"""
# Dict of alias used in rundb: regex on hostname
hosts = {
'dali': r'^dali.*rcc.*',
}
provide_run_metadata = True
def __init__(self,
mongo_dbname=CONFIG['DEFAULT_DBNAME'],
mongo_collname=CONFIG['DEFAULT_COLLECTION'],
runid_field='name',
local_only=True,
new_data_path=None,
reader_ini_name_is_mode=False,
readonly=True,
*args,
**kwargs):
"""
:param mongo_url: URL to Mongo runs database (including auth)
:param local_only: Do not show data as available if it would have to be
downloaded from a remote location.
:param new_data_path: Path where new files are to be written.
Defaults to None: do not write new data
New files will be registered in the runs db!
:param runid_field: Rundb field to which strax's run_id concept
corresponds. Can be either
- 'name': values must be strings, for XENON1T
- 'number': values must be ints, for XENONnT DAQ tests
:param reader_ini_name_is_mode: If True, will overwrite the 'mode'
field with 'reader.ini.name'.
Other (kw)args are passed to StorageFrontend.__init__
"""
super().__init__(*args, **kwargs)
self.local_only = local_only
self.new_data_path = new_data_path
self.reader_ini_name_is_mode = reader_ini_name_is_mode
self.readonly = readonly
if self.new_data_path is None:
self.readonly = True
self.runid_field = runid_field
if self.runid_field not in ['name', 'number']:
raise ValueError("Unrecognized runid_field option %s" % self.runid_field)
self.client = get_mongo_client()
self.collection = self.client[mongo_dbname][mongo_collname]
self.backends = [
strax.FileSytemBackend(),
]
# Construct mongo query for runs with available data.
# This depends on the machine you're running on.
self.hostname = socket.getfqdn()
self.available_query = [{'host': self.hostname}]
# Go through known host aliases
for host_alias, regex in self.hosts.items():
if re.match(regex, self.hostname):
self.available_query.append({'host': host_alias})
def _data_query(self, key):
"""Return MongoDB query for data field matching key"""
return {
'data': {
'$elemMatch': {
'type': key.data_type,
'meta.lineage': key.lineage,
'$or': self.available_query}}}
def _find(self, key: strax.DataKey,
write, allow_incomplete, fuzzy_for, fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with RunDB yet.")
# Check if the run exists
if self.runid_field == 'name':
run_query = {'name': key.run_id}
else:
run_query = {'number': int(key.run_id)}
dq = self._data_query(key)
doc = self.collection.find_one({**run_query, **dq},
projection=dq)
if doc is None:
# Data was not found
if not write:
raise strax.DataNotAvailable
output_path = os.path.join(self.new_data_path, str(key))
if self.new_data_path is not None:
doc = self.collection.find_one(run_query, projection={'_id'})
if not doc:
raise ValueError(
f"Attempt to register new data for non-existing run {key.run_id}") # noqa
self.collection.find_one_and_update(
{'_id': doc['_id']},
{'$push': {'data': {
'location': output_path,
'host': self.hostname,
'type': key.data_type,
'protocol': strax.FileSytemBackend.__name__,
'meta': {'lineage': key.lineage}
}}})
return (strax.FileSytemBackend.__name__,
output_path)
datum = doc['data'][0]
if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=datum['location'])
return datum['protocol'], datum['location']
[docs] def find_several(self, keys: typing.List[strax.DataKey], **kwargs):
if kwargs['fuzzy_for'] or kwargs['fuzzy_for_options']:
raise NotImplementedError("Can't do fuzzy with RunDB yet.")
if not len(keys):
return []
if not len(set([k.lineage_hash for k in keys])) == 1:
raise ValueError("find_several keys must have same lineage")
if not len(set([k.data_type for k in keys])) == 1:
raise ValueError("find_several keys must have same data type")
keys = list(keys) # Context used to pass a set
if self.runid_field == 'name':
run_query = {'name': {'$in': [key.run_id for key in keys]}}
else:
run_query = {'name': {'$in': [int(key.run_id) for key in keys]}}
dq = self._data_query(keys[0])
projection = dq.copy()
projection.update({
k: True
for k in 'name number'.split()})
results_dict = dict()
for doc in self.collection.find(
{**run_query, **dq}, projection=projection):
datum = doc['data'][0]
if self.runid_field == 'name':
dk = doc['name']
else:
dk = doc['number']
results_dict[dk] = datum['protocol'], datum['location']
return [
results_dict.get(k.run_id, False)
for k in keys]
def _list_available(self,
key: strax.DataKey,
allow_incomplete,
fuzzy_for,
fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with RunDB yet.")
dq = self._data_query(key)
cursor = self.collection.find(
dq,
projection=[self.runid_field])
return [x[self.runid_field] for x in cursor]
def _scan_runs(self, store_fields):
cursor = self.collection.find(
filter={},
projection=strax.to_str_tuple(
list(store_fields) + ['reader.ini.name']))
for doc in tqdm(cursor, desc='Fetching run info from MongoDB',
total=cursor.count()):
del doc['_id']
if self.reader_ini_name_is_mode:
doc['mode'] = \
doc.get('reader', {}).get('ini', {}).get('name', '')
yield doc