ricochet.monitoring.ricochetdb module¶
- class ricochet.monitoring.ricochetdb.CMCPADevice(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- breakdownReplyData(rawdata, data)¶
Take in raw ptc data, and return a dictionary.
The dictionary keys are the data labels, the dictionary values are the data in floats or ints.
- Parameters:
rawdata – The byte array received from the CPA
data – The dictionnary to fill out with parsed data
- static buildRegistersQuery()¶
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.CPX200Device(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- async static cpx200dp_cmd(device_ip_address, device_port, cmd)¶
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.CryoConceptDevice(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.DaqLogger¶
Bases:
object
Class sending DAQ messages to the database.
Once an instance is created, you can send a list of messages using the insert([<dict objects>]) method :
#Connection to the database, done only once from ricochet.monitoring.ricochetdb import Database db = Database.get_database() db.set_address("mongodb://<USER>:<PASSWORD>@ricochetdb.ip2i.in2p3.fr:27017/?authSource=ricochet-lio_daq&ssl=true") db.set_db_name("ricochet-lio_daq") #Managing messages from datetime import datetime as dt import datetime from ricochet.monitoring.ricochetdb import DaqLogger logger = DaqLogger() messages = [] messages.append({'topic':'topic1','timestamp':dt.utcnow().timestamp(),'val1':5,'myfield_2':'text_value1'}) messages.append({'topic':'topic4','timestamp':dt.utcnow().timestamp(),'val1':6,'myfield_8':'text_value2'}) logger.insert(messages) #Deconnection from the DB Database.get_database().close()
The insert() method will raise a RuntimeError exception if mandatory fields are not found in the messages, in which case none of the messages will be inserted in the DB.
- insert(messages)¶
Insert messages in the database.
- Parameters:
messages – A list of dictionnaries. Each dictionnary must contain the topic and timestamp fields.
- Raises:
RuntimeError – all insertions are aborted if a mandatory field is missing.
- class ricochet.monitoring.ricochetdb.DataFetcher(mapping_file='./mapping.json')¶
Bases:
object
Retrieve data from different datasources and upload them to the database (datasources can be MMR3, MGC3 or CRYO devices).
- add_datasource(device_type, device_ip_address, file_location='/www/device.htm')¶
Adds a datasource to be queried by the DataFetcher.
- Parameters:
device_type (str) – Type of the device, can be CMCPA, MMR3, MGC3, CRYO or JSON-CRYO.
device_ip_address (str) – IP address of the device.
file_location (str) – Address of the html file to parse on the HTTP server. If set to ‘udp’, the device will be queried using the embeded UDP server.
- append_from_file(file_name)¶
Append the data read in the JSON file to the existing data
- Parameters:
file_name (str) – Name of the input file
- fetch(data_type='imacrt')¶
Query the registered datasources and store the results in the DataFetcher object.
- Parameters:
data_type (str) – Type of devices to query. If ‘imacrt’, all MMR3 and MGC3 devices will be queried. If ‘cryostat”, all CRYO devices will be queries.
The UTC timestamp associated with the data is coming from the database server.
- move_to_file(file_name)¶
Move the data stored in the DataFetcher object in a local JSON file.
- Parameters:
file_name (str) – Name of the output file
- upload(collection)¶
Uploads the data stored in the DataFetcher object to the configured database.
- Parameters:
collection (str) – The name of the collection in the database
- class ricochet.monitoring.ricochetdb.Database¶
Bases:
object
Connection to a MongoDB server. Allows to connect, get informations and save data in the database. This is a singleton class, the handler should always be retrieved through the get_database() method.
Configuring the connection:
from ricochetdb import Database db = Database.get_database() db.set_address("mongodb://<USER>:<PASSWORD>@lyomanoirsrv01.in2p3.fr:27017/<DATABASE_NAME>") db.set_db_name("<DATABASE_NAME>")
You can then retrieve the connection handler using
Database.get_database()
from anywhere in the code.
Sending a request:
Get 5 records from mmr3-2020 where timestamp is greater than 14-04-2020 :
myquery = { "timestamp": { "$gt": dt.datetime(2020,4,14) } } for r in Database.get_database().get_collection("mmr3-2020").find(myquery).limit(5): print(i)
- close()¶
Close existing connection to the database
- get_collection(col)¶
Get a Collection object from a database collection
- Parameters:
col (str) – The name of the collection
- classmethod get_database()¶
Class Method. Retrieve the handler on the database object.
- get_db_timestamp()¶
Get the date and time from the database server.
- Returns:
Returns the date and time from the database server as a
Datetime.Datetime
object.- Raises:
RuntimeError – Raised if no database server is configured.
- handler = None¶
- is_configured()¶
Check if the Database connexion is configured
- Returns:
True if the connexion is configured, Flase otherwise
- set_address(address)¶
Set the address of the mongo server
- Parameters:
address (str) – A string using the syntax mongodb://<USER>:<PASSWORD>@myserver.domaine.fr:<PORT>/<DB_NAME>
- set_db_name(name)¶
Set the name of the database on which you whish to connect
- Parameters:
name (str) – The name of the database
- upload(data, collection, bulk_size=1000)¶
Upload data to a collection in the database.
- Parameters:
data (list(str)) – A list of
dict
objectscollection (str) – Name of the collection to insert data
bulk_size (int) – Number of documents to insert within a bulk
- class ricochet.monitoring.ricochetdb.DatedRecordManager(collection_name)¶
Bases:
RecordManager
- start(record_name, start_date=None)¶
Add a start_date field to the record in the database, containing the current UTC date.
- Parameters:
record_name – The name of the record
start_date – A datetime object containing the starting date, default is current date
- Raises:
RuntimeError – all insertions are aborted if the record does not exist or is already started.
- stop(record_name, stop_date=None)¶
Add n stop_date field to the record in the database, containing the current UTC date.
- Parameters:
record_name – The name of the record
stop_date – An UTC datetime object with timezone info containing the end date, default is current date
- Raises:
RuntimeError – all insertions are aborted if the record does not exist or is already stopped.
- class ricochet.monitoring.ricochetdb.DetectorManager(collection_name)¶
Bases:
RecordManager
- create(record_dict)¶
Insert a new record in the database.
- Parameters:
record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).
- Raises:
RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.
- class ricochet.monitoring.ricochetdb.Device(ip_addr: str, prot: str, cat: str)¶
Bases:
ABC
Abstract class managing datasource devices (MMR3, MGC3, Crystat, …)
- abstract get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.DummyImacrtServer(m='MMR3', ip_address='127.0.0.1', p=12001)¶
Bases:
Thread
Class emulating an IMacRT UDP server for test purpose
- run()¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- class ricochet.monitoring.ricochetdb.JSONCMCPADevice(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.JSONScaleDevice(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.MGC3Device(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.MMR3Device(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.MP710508Device(ip_addr: str, prot: str, cat: str)¶
Bases:
Device
- get_data(results, udp_buffer=[])¶
Query the device and store the data in the results list.
- Parameters:
results – A list where the data will be recorded.
udp_buffer – An optional buffer used by UDP communication
- class ricochet.monitoring.ricochetdb.ProcessingDataLoader¶
Bases:
object
Parse JSON files generated by the processing and upload them in the database (the connection must already be configured). All timestamps in the JSON files must be in UTC.
- parse_folder(folder_name, regexp, delete_files=True)¶
Parse all files matching the regexp in the given folder and upload them to the database.
- Parameters:
folder_name – Folder where the files are stored.
regexp (string) – Regular expression to select the files in the folder (ex : “[0-9a-z]{8}.json”).
delete_files (boolean) – if True (default), delete correctly processed files.
For each stream, we check that the data generation timestamp is newer than the one contained in the database. If not, the file is not treated (data already in the database).
- class ricochet.monitoring.ricochetdb.ProcessingRecord(file_name)¶
Bases:
object
Handles data coming from a processing file.
- stream_idstr
The name of the stream.
- file_ts
Datetime.Datetime
The timestamp of the data generation (UTC).
- recordslist of dict
The data contained in the file
- class ricochet.monitoring.ricochetdb.RecordManager(collection_name)¶
Bases:
ABC
Abstract class managing the creation and storage of documents in the database
Once an instance is created you can insert new records or update existing ones.
- abstract create(record_dict)¶
Insert a new record in the database.
- Parameters:
record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).
- Raises:
RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.
- class ricochet.monitoring.ricochetdb.RunManager(collection_name, dcn)¶
Bases:
DatedRecordManager
Connection to the database, done only once at the beginning :
from ricochet.monitoring.ricochetdb import Database db = Database.get_database() db.set_address("mongodb://<USER>:<PASSWORD>@<SERVER>:<PORT>") db.set_db_name("<DB_NAME>")
Creation of a run (the dictionnary given to the create method must contain the fields ‘name’, ‘detector’) :
from datetime import datetime as dt import datetime from dateutil import parser from ricochet.monitoring.ricochetdb import RunManager sr = RunManager("lio_run_info", "lio_detector_info") try: start = parser.parse("2022-05-11T14:35:30.412") # Must be in UTC! stop = parser.parse("2022-05-11T14:43:48.586") sr.create({'name': 'testRun01', 'detector': [{'name':'RED15'}, {'name':'RED234'}], 'start_date':start, 'stop_date':stop, 'description': ''}) except RuntimeError as e: print(e)
You can create a run without a stop date and stop it later with :
from datetime import datetime as dt import datetime from dateutil import parser from ricochet.monitoring.ricochetdb import RunManager sr = RunManager("lio_run_info", "lio_detector_info") try: stop = parser.parse("2022-05-11T14:43:48.586") sr.stop('testRun01', stop) #sr.stop('testRun01') #also possible : will use the current UTC date except RuntimeError as e: print(e)
At the end of the program : disconnecting from DB :
Database.get_database().close()
The insert() method will raise a RuntimeError exception if mandatory fields are not found in the dictionnary, in which case the run will not be inserted in the DB.
- create(record_dict)¶
Insert a new record in the database.
- Parameters:
record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).
- Raises:
RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.
- class ricochet.monitoring.ricochetdb.StreamManager(collection_name, dcn)¶
Bases:
DatedRecordManager
Connection to the database, done only once at the beginning :
from ricochet.monitoring.ricochetdb import Database db = Database.get_database() db.set_address("mongodb://<USER>:<PASSWORD>@<SERVER>:<PORT>") db.set_db_name("<DB_NAME>")
Creation of a stream (the dictionnary given to the create method must contain the fields ‘name’, ‘acq_config’, ‘channel_config’ and ‘comment’) :
from datetime import datetime as dt import datetime from dateutil import parser from ricochet.monitoring.ricochetdb import StreamManager sm = StreamManager("lio_stream_info", "lio_detector_info") try: start = parser.parse("2022-05-11T14:35:30.412") stop = parser.parse("2022-05-11T14:43:48.586") sm.create({'name': 'testStream001', 'acq_config': {'modulation': True, 'd2': 250, 'd3': 400, 'Rt': 1, 'maintenance': False}, 'channel_config': [{'det_name': 'FID1', 'chan_name': 'ChalA', 'amplitude': 0.11, 'compensation': 0.771, 'correction': 38, 'gain': 1}, {'det_name': 'FID1', 'chan_name': 'ChalB', 'amplitude': 0.11, 'compensation': 0.624, 'correction': 35, 'gain': 1}], 'start_date':start, 'stop_date':stop, 'comment': '', 'status': 'good'}) except RuntimeError as e: print(e)
Adding an history record using an existing StreamManager object sm:
ts = dt.utcnow() sm.add_history_record(ts,'testStream001','FID1','processed','87B5E5A','New processing with corrected code')
At the end of the program : disconnecting from DB :
Database.get_database().close()
The insert() method will raise a RuntimeError exception if mandatory fields are not found in the dictionnary, in which case the stream will not be inserted in the DB.
- add_detector(stream_name: str, channel_config: dict, acq_config: dict, bbox_status: dict)¶
Add informations about a detector to an existing stream. :param stream_name: The name of the stream :param channel_config: dictionnary containing data about the configuration of a detector present in the stream :param acq_config: dictionnary containing data about the list of detectors in the stream :param bbox_status: dictionnary containing data about the status of bolo-boxes in the stream :raises RuntimeError: insertion is aborted if the stream does not exist, is still running or the detector is already known in the stream.
- add_history_record(timestamp, stream_name, detector_name, label, code_commit, description='', processing_status=None, processing_conf=None)¶
Add a new record to the history of the stream.
- Parameters:
timestamp – Date of the record, this must be a
Datetime.Datetime
objectstream_name – The name of the stream
detector_name – The name of the detector
label – The label of the history record (processed, analyzed, …)
code_commit – The git commit of the code used
description – Any other useful information
processing_status – Optional float between 0 and 1 giving the proportion of processing success
processing_conf – Optional dictionary giving processing configuration
- Raises:
RuntimeError – insertion is aborted if the stream does not exist or is still running.
- add_temperature(stream_name, mean_temp, std_temp)¶
Add into the stream record informations about the temperature during the stream, taken from the MMR3 records. :param stream_name: The name of the stream :param mean_temp: Mean 4K temperature during the stream :param std_temp: Standard deviation of the temperature during the run :raises RuntimeError: insertion is aborted if the stream does not exist or is still running.
- create(record_dict)¶
Insert a new record in the database.
- Parameters:
record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).
- Raises:
RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.
- class ricochet.monitoring.ricochetdb.UdpImacrtCollector(data_buffer, ip_address='0.0.0.0', p=12000)¶
Bases:
Thread
Class listening to and storing UDP messages from iMACRT
- run()¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- stop_after_next_message()¶
- ricochet.monitoring.ricochetdb.is_date(value)¶
Check if the given string is a date
- ricochet.monitoring.ricochetdb.is_float(value)¶
Check if the given string is a float
- ricochet.monitoring.ricochetdb.is_int(value)¶
Check if the given string is an int
- ricochet.monitoring.ricochetdb.query_imacrt_udp(ip, message)¶
- ricochet.monitoring.ricochetdb.send_udp_message(ip, port, message)¶
- ricochet.monitoring.ricochetdb.to_local(utc_date)¶