ricochet.monitoring.ricq module¶
- class ricochet.monitoring.ricq.DataFetcher¶
Bases:
object
- Used to retrieve Ricochet slow control data from the ElasticSearch cluster. Configuration will be taken from environment variables :
$ES_SERVER : cluster address (ex: mycluster.my.domain:9001)
$ES_ROOT_CA_FILE : CA cert file
- Authentication is done either via basic auth :
$ES_USER : user name
$ES_PASSWORD : user password
- or via client certificate :
$ES_USER_CERT_FILE : user certificate file
$ES_USER_KEY_FILE : user key file
or via Kerberos : no additional environment variable, a token must exist on the system (kinit command)
- Example:
Retrieving cernox_still and cernox_50k values from the lio.temperature index for the 01/09/2019, one value per hour.
from datetime import datetime import ricochet.monitoring.ricq as ricq try: df = ricq.DataFetcher() df.fetch(["ricochet-lio.temperature%cernox_still","ricochet-lio.temperature%cernox_50k"] ,datetime(2019,9,1), datetime(2019,9,2), "1h") d = df.get_data() print(d) except RuntimeError as e: print(e) except ValueError as e: print(e)
datetime cernox_still cernox_50k 0 2019-09-01 00:00:00 0.965215 61.086022 1 2019-09-01 01:00:00 0.964848 61.088353 2 2019-09-01 02:00:00 0.965159 61.103076 3 2019-09-01 03:00:00 0.965289 61.118557 4 2019-09-01 04:00:00 0.965729 61.132924 5 2019-09-01 05:00:00 0.965760 61.143401 6 2019-09-01 06:00:00 0.965483 61.152281 7 2019-09-01 07:00:00 0.965573 61.156753 8 2019-09-01 08:00:00 0.965965 61.157790 9 2019-09-01 09:00:00 0.966453 61.159242 10 2019-09-01 10:00:00 0.966025 61.118044 11 2019-09-01 11:00:00 0.965200 61.073017 12 2019-09-01 12:00:00 0.964738 61.068119 13 2019-09-01 13:00:00 0.964744 61.083502 14 2019-09-01 14:00:00 0.964968 61.106286 15 2019-09-01 15:00:00 0.965372 61.121716 16 2019-09-01 16:00:00 0.965846 61.135635 17 2019-09-01 17:00:00 0.965594 61.148763 18 2019-09-01 18:00:00 0.966031 61.160616 19 2019-09-01 19:00:00 0.965769 61.160062 20 2019-09-01 20:00:00 0.965228 61.129484 21 2019-09-01 21:00:00 0.965299 61.104936 22 2019-09-01 22:00:00 0.965078 61.102760 23 2019-09-01 23:00:00 0.965169 61.116333
You can do whatever you want with the retrieved data. You can also directly export the data into a CSV file :
df.export_to_csv("data.csv")
Or create a default plot with the values :
df.plot()
If you need a calendar binning (values per weeks, months, years, …), check out the “p_interval_type” parameter of the fetch method.
- clear()¶
Clear the data cache
- disconnect()¶
Disconnection of the underlying connection pool
- empty()¶
Check if data are available
- Returns:
Returns True if no data are available
- export_to_csv(p_output_filename)¶
Export the data cache in a CSV file.
- Parameters:
p_output_filename – The name of the output CSV file.
- fetch(p_user_fields, p_start, p_stop, p_user_interval, p_interval_type='fixed')¶
Connect to the cluster and retrieve data.
- Parameters:
p_user_fields – A list of strings containing the names of the fields to retrieve and their index with the format “index%field”. Index can be either “ricochet-lio.temperature” or “lio.cryo”.
p_start – A
datetime.datetime
date describing the start of the data periodp_stop – A
datetime.datetime
date describing the end of the data periodp_user_interval – The binning of the data : seconds (s), minutes (m), hours (h), days (d), as in “30s”, “15m”, “30d”… The function will return an averaged value on the given interval.
p_interval_type – Type of time interval used to average data. Can be “fixed” in standard units (15m, 2h, 6d, …) or “calendar” (minute, hour, day, week, month, year)
- Raises:
ValueError – Raised if the format of the input date parameters is not known.
- get_data()¶
Returns the data cache
- Returns:
Returns a
pandas.DataFrame
object containing the retrieved data.
- get_data_period()¶
Returns the starting and ending dates of the data cache
- Returns:
Returns two
Datetime.Datetime
objects containing the start and end dates, None if not available (no data cached).
- plot()¶
Create a default plot of the cached data.
- class ricochet.monitoring.ricq.ELSConnection¶
Bases:
object
Connection to an ElasticSearch instance. This is a singleton class, the handler should always be retrieved through the get_handler() method.
Retrieving a handler to the elasticsearch instance:
from ricochet.monitoring.ricq import ELSConnection h = ELSConnection.get_handler()
This can be used from anywhere in the code.
- classmethod disconnect()¶
- classmethod get_handler()¶
Class Method. Retrieve the handler on the elasticsearch connexion.
- classmethod get_instance()¶
Class Method. Retrieve the singleton instance of the class.
- instance = None¶
- class ricochet.monitoring.ricq.ExistsConstraint(field)¶
Bases:
Constraint
- class ricochet.monitoring.ricq.MatchAllConstraint¶
Bases:
Constraint
- class ricochet.monitoring.ricq.MatchConstraint(field, value)¶
Bases:
Constraint
- class ricochet.monitoring.ricq.Query¶
Bases:
ABC
Class allowing to build queries and send them to the ElasticSearch REST API.
This is an abstract class, it can be used throught its inheriting classes Contraint, MultiQuery and NestedQuery.
A constraint is a single instruction to select documents. There are 4 types of constraints :
MatchAllConstraint : takes everything
ExistsConstraint : check that a specific field exists in the document (and is not null!)
MatchConstraint : check that a specific field has a specific value in the document
RangeConstraint : check that a specific field’s value is located between 2 given limits
A first example with a query returning all documents in the ricochet-lio_db.stream_info index :
import ricochet.monitoring.ricq as ricq all = ricq.MatchAllConstraint() r = all.execute("ricochet-lio_db.stream_info", nb_records=2000)
r is now an array containing all returned documents. By default, the execute function will only return the 10 first results. You can increase this number with the nb_records parameter. You can access each field of each result directly :
r[0].name
Or you can have access to the whole document :
r[0].to_dict()
We can search for documents where the start_date field is present and not null :
import ricochet.monitoring.ricq as ricq with_date = ricq.ExistsConstraint("start_date") r = with_date.execute("ricochet-lio_db.stream_info", nb_records=2000)
Or retrieve the data about the specific stream th28l002 :
import ricochet.monitoring.ricq as ricq s = ricq.MatchConstraint("name", "th28l002") r = s.execute("ricochet-lio_db.stream_info")
Select all streams started in January 2021 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") r = january.execute("ricochet-lio_db.stream_info", nb_records=500)
If you want to build more complex queries, you will need to use several constraints. You can group constraints by using the MultiQuery class, providing the operator to use : and, or, and not, or not. Let’s find the streams from January 2021 where the temperature is between 0.01 and 0.015 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) r = q.execute("ricochet-lio_db.stream_info", nb_records=500)
You can use a MultiQuery in a MultiQuery. If we want streams started in January 2021 with a temperature between 0.01 and 0.015 OR any stream started in Decembre 2020 :
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31") q2 = ricq.MultiQuery("or", q, decembre) r = q2.execute("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])
You can use the sorting_fields of the execute function to give a list of fields to use to sort the results.
Things are a bit different if you want to put constraints on a list field (for example channel_configs). You will want to select documents (streams) on values that are possibly contained in the list : this is a Nested Query. For example, we want streams which include data for channel ‘chala’ of detector ‘SEL10’ :
import ricochet.monitoring.ricq as ricq sel10 = ricq.MatchConstraint("channel_configs.det_name", "SEL10") #Constraint on the detector name chala = ricq.MatchConstraint("channel_configs.chan_name", "chala") #Constraint on the channel name select = ricq.MultiQuery("and",sel10,chala) # We want detector name AND channel name q = ricq.NetstedQuery("channel_configs", select) # This is a nested query on the channel_configs list r = q.execute("ricochet-lio_db.stream_info", nb_records=500)
You can get the CURL format of your query by using the print_curl() method which takes the same arguments as the execute() method:
import ricochet.monitoring.ricq as ricq january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31") temp = ricq.RangeConstraint("temperature",0.01,0.015) q = ricq.MultiQuery("and", temp, january) decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31") q2 = ricq.MultiQuery("or", q, decembre) r = q2.print_curl("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])
- execute(index, nb_records=10, sorting_fields=[], sorting_order=1)¶
Send the query to ELS and fetch the results
- Parameters:
index – Name of the elasticsearch index to query.
nb_records – Maximum number of returned documents. Default is 10.
sorting_fields – List of fields to use to sort results. Default is empty (no sorting).
sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.
- Returns:
Returns a list of answers.
- print_curl(index, nb_records=10, sorting_fields=[], sorting_order=1)¶
Build and print the query formated for Curl
- Parameters:
index – Name of the elasticsearch index to query.
nb_records – Maximum number of returned documents. Default is 10.
sorting_fields – List of fields to use to sort results. Default is empty (no sorting).
sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.
- Returns:
Returns a list of answers.
- class ricochet.monitoring.ricq.RangeConstraint(field, min_value, max_value)¶
Bases:
Constraint