ricochet.monitoring.ricq module¶
- class ricochet.monitoring.ricq.DataFetcher¶
Bases:
object- Used to retrieve Ricochet slow control data from the ElasticSearch cluster. Configuration will be taken from environment variables :
$ES_SERVER : cluster address (ex: mycluster.my.domain:9001)
$ES_ROOT_CA_FILE : CA cert file
- Authentication is done either via basic auth :
$ES_USER : user name
$ES_PASSWORD : user password
- or via client certificate :
$ES_USER_CERT_FILE : user certificate file
$ES_USER_KEY_FILE : user key file
or via Kerberos : no additional environment variable, a token must exist on the system (kinit command)
- Example:
Retrieving cernox_still and cernox_50k values from the lio.temperature index for the 01/09/2019, one value per hour.
from datetime import datetime import ricochet.monitoring.ricq as ricq try: df = ricq.DataFetcher() df.fetch(["ricochet-lio.temperature%cernox_still","ricochet-lio.temperature%cernox_50k"] ,datetime(2019,9,1), datetime(2019,9,2), "1h") d = df.get_data() print(d) except RuntimeError as e: print(e) except ValueError as e: print(e)
datetime cernox_still cernox_50k 0 2019-09-01 00:00:00 0.965215 61.086022 1 2019-09-01 01:00:00 0.964848 61.088353 2 2019-09-01 02:00:00 0.965159 61.103076 3 2019-09-01 03:00:00 0.965289 61.118557 4 2019-09-01 04:00:00 0.965729 61.132924 5 2019-09-01 05:00:00 0.965760 61.143401 6 2019-09-01 06:00:00 0.965483 61.152281 7 2019-09-01 07:00:00 0.965573 61.156753 8 2019-09-01 08:00:00 0.965965 61.157790 9 2019-09-01 09:00:00 0.966453 61.159242 10 2019-09-01 10:00:00 0.966025 61.118044 11 2019-09-01 11:00:00 0.965200 61.073017 12 2019-09-01 12:00:00 0.964738 61.068119 13 2019-09-01 13:00:00 0.964744 61.083502 14 2019-09-01 14:00:00 0.964968 61.106286 15 2019-09-01 15:00:00 0.965372 61.121716 16 2019-09-01 16:00:00 0.965846 61.135635 17 2019-09-01 17:00:00 0.965594 61.148763 18 2019-09-01 18:00:00 0.966031 61.160616 19 2019-09-01 19:00:00 0.965769 61.160062 20 2019-09-01 20:00:00 0.965228 61.129484 21 2019-09-01 21:00:00 0.965299 61.104936 22 2019-09-01 22:00:00 0.965078 61.102760 23 2019-09-01 23:00:00 0.965169 61.116333
You can do whatever you want with the retrieved data. You can also directly export the data into a CSV file :
df.export_to_csv("data.csv")
Or create a default plot with the values :
df.plot()
If you need a calendar binning (values per weeks, months, years, …), check out the “p_interval_type” parameter of the fetch method.
- clear()¶
Clear the data cache
- disconnect()¶
Disconnection of the underlying connection pool
- empty()¶
Check if data are available
- Returns:
Returns True if no data are available
- export_to_csv(p_output_filename)¶
Export the data cache in a CSV file.
- Parameters:
p_output_filename – The name of the output CSV file.
- fetch(p_user_fields, p_start, p_stop, p_user_interval, p_interval_type='fixed')¶
Connect to the cluster and retrieve data.
- Parameters:
p_user_fields – A list of strings containing the names of the fields to retrieve and their index with the format “index%field”. Index can be either “ricochet-lio.temperature” or “lio.cryo”.
p_start – A
datetime.datetimedate describing the start of the data periodp_stop – A
datetime.datetimedate describing the end of the data periodp_user_interval – The binning of the data : seconds (s), minutes (m), hours (h), days (d), as in “30s”, “15m”, “30d”… The function will return an averaged value on the given interval.
p_interval_type – Type of time interval used to average data. Can be “fixed” in standard units (15m, 2h, 6d, …) or “calendar” (minute, hour, day, week, month, year)
- Raises:
ValueError – Raised if the format of the input date parameters is not known.
- get_data()¶
Returns the data cache
- Returns:
Returns a
pandas.DataFrameobject containing the retrieved data.
- get_data_period()¶
Returns the starting and ending dates of the data cache
- Returns:
Returns two
Datetime.Datetimeobjects containing the start and end dates, None if not available (no data cached).
- plot()¶
Create a default plot of the cached data.
- class ricochet.monitoring.ricq.ELSConnection¶
Bases:
objectConnection to an ElasticSearch instance. This is a singleton class, the handler should always be retrieved through the get_handler() method.
Retrieving a handler to the opensearch instance:
from ricochet.monitoring.ricq import ELSConnection h = ELSConnection.get_handler()
This can be used from anywhere in the code.
- classmethod disconnect()¶
- classmethod get_handler()¶
Class Method. Retrieve the handler on the opensearch connexion.
- classmethod get_instance()¶
Class Method. Retrieve the singleton instance of the class.
- instance = None¶
- class ricochet.monitoring.ricq.ExistsConstraint(field)¶
Bases:
Constraint
- class ricochet.monitoring.ricq.MatchAllConstraint¶
Bases:
Constraint
- class ricochet.monitoring.ricq.MatchConstraint(field, value)¶
Bases:
Constraint
- class ricochet.monitoring.ricq.Query¶
Bases:
ABCClass allowing to build queries and send them to the ElasticSearch REST API.
This is an abstract class, it can be used throught its inheriting classes Contraint, MultiQuery and NestedQuery.
A constraint is a single instruction to select documents. There are 4 types of constraints :
MatchAllConstraint : takes everything
ExistsConstraint : check that a specific field exists in the document (and is not null!)
MatchConstraint : check that a specific field has a specific value in the document
RangeConstraint : check that a specific field’s value is located between 2 given limits
A first example with a query returning all documents in the ricochet-lio_db.stream_info index :
import ricochet.monitoring.ricq as ricq all = ricq.MatchAllConstraint() r = all.execute("ricochet-lio_db.stream_info", nb_records=2000)
r is now an array containing all returned documents. By default, the execute function will only return the 10 first results. You can increase this number with the nb_records parameter. You can access each field of each result directly :
r[0].name
Or you can have access to the whole document :
r[0].to_dict()
We can search for documents where the start_date field is present and not null :
import ricochet.monitoring.ricq as ricq with_date = ricq.ExistsConstraint("start_date") r = with_date.execute("ricochet-lio_db.stream_info", nb_records=2000)
Or retrieve the data about the specific stream th28l002 :
import ricochet.monitoring.ricq as ricq s = ricq.MatchConstraint("name", "th28l002") r = s.execute("ricochet-lio_db.stream_info")
Select all streams started in January 2021 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") r = january.execute("ricochet-lio_db.stream_info", nb_records=500)
If you want to build more complex queries, you will need to use several constraints. You can group constraints by using the MultiQuery class, providing the operator to use : and, or, and not, or not. Let’s find the streams from January 2021 where the temperature is between 0.01 and 0.015 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) r = q.execute("ricochet-lio_db.stream_info", nb_records=500)
You can use a MultiQuery in a MultiQuery. If we want streams started in January 2021 with a temperature between 0.01 and 0.015 OR any stream started in Decembre 2020 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31") q2 = ricq.MultiQuery("or", q, decembre) r = q2.execute("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])
You can use the sorting_fields of the execute function to give a list of fields to use to sort the results.
Things are a bit different if you want to put constraints on a list field (for example channel_configs). You will want to select documents (streams) on values that are possibly contained in the list : this is a Nested Query. For example, we want streams which include data for channel ‘chala’ of detector ‘SEL10’ :
import ricochet.monitoring.ricq as ricq sel10 = ricq.MatchConstraint("channel_configs.det_name", "SEL10") #Constraint on the detector name chala = ricq.MatchConstraint("channel_configs.chan_name", "chala") #Constraint on the channel name select = ricq.MultiQuery("and",sel10,chala) # We want detector name AND channel name q = ricq.NetstedQuery("channel_configs", select) # This is a nested query on the channel_configs list r = q.execute("ricochet-lio_db.stream_info", nb_records=500)
You can get the CURL format of your query by using the print_curl() method which takes the same arguments as the execute() method:
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31") q2 = ricq.MultiQuery("or", q, decembre) r = q2.print_curl("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])
- execute(index, nb_records=10, sorting_fields=[], sorting_order=1)¶
Send the query to ELS and fetch the results
- Parameters:
index – Name of the opensearch index to query.
nb_records – Maximum number of returned documents. Default is 10.
sorting_fields – List of fields to use to sort results. Default is empty (no sorting).
sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.
- Returns:
Returns a list of answers.
- print_curl(index, nb_records=10, sorting_fields=[], sorting_order=1)¶
Build and print the query formated for Curl
- Parameters:
index – Name of the opensearch index to query.
nb_records – Maximum number of returned documents. Default is 10.
sorting_fields – List of fields to use to sort results. Default is empty (no sorting).
sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.
- Returns:
Returns a list of answers.
- class ricochet.monitoring.ricq.RangeConstraint(field, min_value, max_value)¶
Bases:
Constraint