ricochet.monitoring.ricochetdb module

class ricochet.monitoring.ricochetdb.CMCPADevice(ip_addr: str, prot: str, cat: str)

Bases: Device

breakdownReplyData(rawdata, data)

Take in raw ptc data, and return a dictionary.

The dictionary keys are the data labels, the dictionary values are the data in floats or ints.

Parameters:
  • rawdata – The byte array received from the CPA

  • data – The dictionnary to fill out with parsed data

static buildRegistersQuery()
get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.CPX200Device(ip_addr: str, prot: str, cat: str)

Bases: Device

async static cpx200dp_cmd(device_ip_address, device_port, cmd)
get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.CryoConceptDevice(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.DaqLogger

Bases: object

Class sending DAQ messages to the database.

Once an instance is created, you can send a list of messages using the insert([<dict objects>]) method :

#Connection to the database, done only once
from ricochet.monitoring.ricochetdb import Database
db = Database.get_database()
db.set_address("mongodb://<USER>:<PASSWORD>@ricochetdb.ip2i.in2p3.fr:27017/?authSource=ricochet-lio_daq&ssl=true")
db.set_db_name("ricochet-lio_daq")

#Managing messages
from datetime import datetime as dt
import datetime
from ricochet.monitoring.ricochetdb import DaqLogger
logger = DaqLogger()
messages = []
messages.append({'topic':'topic1','timestamp':dt.utcnow().timestamp(),'val1':5,'myfield_2':'text_value1'})
messages.append({'topic':'topic4','timestamp':dt.utcnow().timestamp(),'val1':6,'myfield_8':'text_value2'})
logger.insert(messages)

#Deconnection from the DB
Database.get_database().close()

The insert() method will raise a RuntimeError exception if mandatory fields are not found in the messages, in which case none of the messages will be inserted in the DB.

insert(messages)

Insert messages in the database.

Parameters:

messages – A list of dictionnaries. Each dictionnary must contain the topic and timestamp fields.

Raises:

RuntimeError – all insertions are aborted if a mandatory field is missing.

class ricochet.monitoring.ricochetdb.DataFetcher(mapping_file='./mapping.json')

Bases: object

Retrieve data from different datasources and upload them to the database (datasources can be MMR3, MGC3 or CRYO devices).

add_datasource(device_type, device_ip_address, file_location='/www/device.htm')

Adds a datasource to be queried by the DataFetcher.

Parameters:
  • device_type (str) – Type of the device, can be CMCPA, MMR3, MGC3, CRYO or JSON-CRYO.

  • device_ip_address (str) – IP address of the device.

  • file_location (str) – Address of the html file to parse on the HTTP server. If set to ‘udp’, the device will be queried using the embeded UDP server.

append_from_file(file_name)

Append the data read in the JSON file to the existing data

Parameters:

file_name (str) – Name of the input file

fetch(data_type='imacrt')

Query the registered datasources and store the results in the DataFetcher object.

Parameters:

data_type (str) – Type of devices to query. If ‘imacrt’, all MMR3 and MGC3 devices will be queried. If ‘cryostat”, all CRYO devices will be queries.

The UTC timestamp associated with the data is coming from the database server.

move_to_file(file_name)

Move the data stored in the DataFetcher object in a local JSON file.

Parameters:

file_name (str) – Name of the output file

upload(collection)

Uploads the data stored in the DataFetcher object to the configured database.

Parameters:

collection (str) – The name of the collection in the database

class ricochet.monitoring.ricochetdb.Database

Bases: object

Connection to a MongoDB server. Allows to connect, get informations and save data in the database. This is a singleton class, the handler should always be retrieved through the get_database() method.

Configuring the connection:

from ricochetdb import Database
db = Database.get_database()
db.set_address("mongodb://<USER>:<PASSWORD>@lyomanoirsrv01.in2p3.fr:27017/<DATABASE_NAME>")
db.set_db_name("<DATABASE_NAME>")

You can then retrieve the connection handler using

Database.get_database()

from anywhere in the code.

Sending a request:

Get 5 records from mmr3-2020 where timestamp is greater than 14-04-2020 :

myquery = { "timestamp": { "$gt": dt.datetime(2020,4,14) } }
for r in Database.get_database().get_collection("mmr3-2020").find(myquery).limit(5):
  print(i)
close()

Close existing connection to the database

get_collection(col)

Get a Collection object from a database collection

Parameters:

col (str) – The name of the collection

classmethod get_database()

Class Method. Retrieve the handler on the database object.

get_db_timestamp()

Get the date and time from the database server.

Returns:

Returns the date and time from the database server as a Datetime.Datetime object.

Raises:

RuntimeError – Raised if no database server is configured.

handler = None
is_configured()

Check if the Database connexion is configured

Returns:

True if the connexion is configured, Flase otherwise

set_address(address)

Set the address of the mongo server

Parameters:

address (str) – A string using the syntax mongodb://<USER>:<PASSWORD>@myserver.domaine.fr:<PORT>/<DB_NAME>

set_db_name(name)

Set the name of the database on which you whish to connect

Parameters:

name (str) – The name of the database

upload(data, collection, bulk_size=1000)

Upload data to a collection in the database.

Parameters:
  • data (list(str)) – A list of dict objects

  • collection (str) – Name of the collection to insert data

  • bulk_size (int) – Number of documents to insert within a bulk

class ricochet.monitoring.ricochetdb.DatedRecordManager(collection_name)

Bases: RecordManager

start(record_name, start_date=None)

Add a start_date field to the record in the database, containing the current UTC date.

Parameters:
  • record_name – The name of the record

  • start_date – A datetime object containing the starting date, default is current date

Raises:

RuntimeError – all insertions are aborted if the record does not exist or is already started.

stop(record_name, stop_date=None)

Add n stop_date field to the record in the database, containing the current UTC date.

Parameters:
  • record_name – The name of the record

  • stop_date – An UTC datetime object with timezone info containing the end date, default is current date

Raises:

RuntimeError – all insertions are aborted if the record does not exist or is already stopped.

class ricochet.monitoring.ricochetdb.DetectorManager(collection_name)

Bases: RecordManager

create(record_dict)

Insert a new record in the database.

Parameters:

record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).

Raises:

RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.

class ricochet.monitoring.ricochetdb.Device(ip_addr: str, prot: str, cat: str)

Bases: ABC

Abstract class managing datasource devices (MMR3, MGC3, Crystat, …)

abstract get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.DummyImacrtServer(m='MMR3', ip_address='127.0.0.1', p=12001)

Bases: Thread

Class emulating an IMacRT UDP server for test purpose

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class ricochet.monitoring.ricochetdb.JSONCMCPADevice(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.JSONScaleDevice(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.MGC3Device(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.MMR3Device(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.MP710508Device(ip_addr: str, prot: str, cat: str)

Bases: Device

get_data(results, udp_buffer=[])

Query the device and store the data in the results list.

Parameters:
  • results – A list where the data will be recorded.

  • udp_buffer – An optional buffer used by UDP communication

class ricochet.monitoring.ricochetdb.ProcessingDataLoader

Bases: object

Parse JSON files generated by the processing and upload them in the database (the connection must already be configured). All timestamps in the JSON files must be in UTC.

parse_folder(folder_name, regexp, delete_files=True)

Parse all files matching the regexp in the given folder and upload them to the database.

Parameters:
  • folder_name – Folder where the files are stored.

  • regexp (string) – Regular expression to select the files in the folder (ex : “[0-9a-z]{8}.json”).

  • delete_files (boolean) – if True (default), delete correctly processed files.

For each stream, we check that the data generation timestamp is newer than the one contained in the database. If not, the file is not treated (data already in the database).

class ricochet.monitoring.ricochetdb.ProcessingRecord(file_name)

Bases: object

Handles data coming from a processing file.

stream_idstr

The name of the stream.

file_tsDatetime.Datetime

The timestamp of the data generation (UTC).

recordslist of dict

The data contained in the file

class ricochet.monitoring.ricochetdb.RecordManager(collection_name)

Bases: ABC

Abstract class managing the creation and storage of documents in the database

Once an instance is created you can insert new records or update existing ones.

abstract create(record_dict)

Insert a new record in the database.

Parameters:

record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).

Raises:

RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.

class ricochet.monitoring.ricochetdb.RunManager(collection_name, dcn)

Bases: DatedRecordManager

Connection to the database, done only once at the beginning :

from ricochet.monitoring.ricochetdb import Database
db = Database.get_database()
db.set_address("mongodb://<USER>:<PASSWORD>@<SERVER>:<PORT>")
db.set_db_name("<DB_NAME>")

Creation of a run (the dictionnary given to the create method must contain the fields ‘name’, ‘detector’) :

from datetime import datetime as dt
import datetime
from dateutil import parser
from ricochet.monitoring.ricochetdb import RunManager
sr = RunManager("lio_run_info", "lio_detector_info")
try:
    start = parser.parse("2022-05-11T14:35:30.412") # Must be in UTC!
    stop = parser.parse("2022-05-11T14:43:48.586")
    sr.create({'name': 'testRun01', 'detector': [{'name':'RED15'}, {'name':'RED234'}], 'start_date':start, 'stop_date':stop, 'description': ''})
except RuntimeError as e:
    print(e)

You can create a run without a stop date and stop it later with :

from datetime import datetime as dt
import datetime
from dateutil import parser
from ricochet.monitoring.ricochetdb import RunManager
sr = RunManager("lio_run_info", "lio_detector_info")
try:
    stop = parser.parse("2022-05-11T14:43:48.586")
    sr.stop('testRun01', stop)
    #sr.stop('testRun01') #also possible : will use the current UTC date 
except RuntimeError as e:
    print(e)

At the end of the program : disconnecting from DB :

Database.get_database().close()

The insert() method will raise a RuntimeError exception if mandatory fields are not found in the dictionnary, in which case the run will not be inserted in the DB.

create(record_dict)

Insert a new record in the database.

Parameters:

record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).

Raises:

RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.

class ricochet.monitoring.ricochetdb.StreamManager(collection_name, dcn)

Bases: DatedRecordManager

Connection to the database, done only once at the beginning :

from ricochet.monitoring.ricochetdb import Database
db = Database.get_database()
db.set_address("mongodb://<USER>:<PASSWORD>@<SERVER>:<PORT>")
db.set_db_name("<DB_NAME>")

Creation of a stream (the dictionnary given to the create method must contain the fields ‘name’, ‘acq_config’, ‘channel_config’ and ‘comment’) :

from datetime import datetime as dt
import datetime
from dateutil import parser
from ricochet.monitoring.ricochetdb import StreamManager
sm = StreamManager("lio_stream_info", "lio_detector_info")
try:
    start = parser.parse("2022-05-11T14:35:30.412")
    stop = parser.parse("2022-05-11T14:43:48.586")
    sm.create({'name': 'testStream001', 'acq_config': {'modulation': True, 'd2': 250, 'd3': 400, 'Rt': 1, 'maintenance': False}, 'channel_config': [{'det_name': 'FID1', 'chan_name': 'ChalA', 'amplitude': 0.11, 'compensation': 0.771, 'correction': 38, 'gain': 1}, {'det_name': 'FID1', 'chan_name': 'ChalB', 'amplitude': 0.11, 'compensation': 0.624, 'correction': 35, 'gain': 1}], 'start_date':start, 'stop_date':stop, 'comment': '', 'status': 'good'})
except RuntimeError as e:
    print(e)

Adding an history record using an existing StreamManager object sm:

ts = dt.utcnow()
sm.add_history_record(ts,'testStream001','FID1','processed','87B5E5A','New processing with corrected code')

At the end of the program : disconnecting from DB :

Database.get_database().close()

The insert() method will raise a RuntimeError exception if mandatory fields are not found in the dictionnary, in which case the stream will not be inserted in the DB.

add_detector(stream_name: str, channel_config: dict, acq_config: dict, bbox_status: dict)

Add informations about a detector to an existing stream. :param stream_name: The name of the stream :param channel_config: dictionnary containing data about the configuration of a detector present in the stream :param acq_config: dictionnary containing data about the list of detectors in the stream :param bbox_status: dictionnary containing data about the status of bolo-boxes in the stream :raises RuntimeError: insertion is aborted if the stream does not exist, is still running or the detector is already known in the stream.

add_history_record(timestamp, stream_name, detector_name, label, code_commit, description='', processing_status=None, processing_conf=None)

Add a new record to the history of the stream.

Parameters:
  • timestamp – Date of the record, this must be a Datetime.Datetime object

  • stream_name – The name of the stream

  • detector_name – The name of the detector

  • label – The label of the history record (processed, analyzed, …)

  • code_commit – The git commit of the code used

  • description – Any other useful information

  • processing_status – Optional float between 0 and 1 giving the proportion of processing success

  • processing_conf – Optional dictionary giving processing configuration

Raises:

RuntimeError – insertion is aborted if the stream does not exist or is still running.

add_temperature(stream_name, mean_temp, std_temp)

Add into the stream record informations about the temperature during the stream, taken from the MMR3 records. :param stream_name: The name of the stream :param mean_temp: Mean 4K temperature during the stream :param std_temp: Standard deviation of the temperature during the run :raises RuntimeError: insertion is aborted if the stream does not exist or is still running.

create(record_dict)

Insert a new record in the database.

Parameters:

record_dict – A dictionnary containing informations about the new record (name, acq_config, channel_config, status).

Raises:

RuntimeError – insertion is aborted if a mandatory field is missing or a record is duplicated.

class ricochet.monitoring.ricochetdb.UdpImacrtCollector(data_buffer, ip_address='0.0.0.0', p=12000)

Bases: Thread

Class listening to and storing UDP messages from iMACRT

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop_after_next_message()
ricochet.monitoring.ricochetdb.is_date(value)

Check if the given string is a date

ricochet.monitoring.ricochetdb.is_float(value)

Check if the given string is a float

ricochet.monitoring.ricochetdb.is_int(value)

Check if the given string is an int

ricochet.monitoring.ricochetdb.query_imacrt_udp(ip, message)
ricochet.monitoring.ricochetdb.send_udp_message(ip, port, message)
ricochet.monitoring.ricochetdb.to_local(utc_date)