ricochet.monitoring.ricq module

class ricochet.monitoring.ricq.Constraint(field)

Bases: Query, ABC

class ricochet.monitoring.ricq.DataFetcher

Bases: object

Used to retrieve Ricochet slow control data from the ElasticSearch cluster. Configuration will be taken from environment variables :
  • $ES_SERVER : cluster address (ex: mycluster.my.domain:9001)

  • $ES_ROOT_CA_FILE : CA cert file

Authentication is done either via basic auth :
  • $ES_USER : user name

  • $ES_PASSWORD : user password

or via client certificate :
  • $ES_USER_CERT_FILE : user certificate file

  • $ES_USER_KEY_FILE : user key file

or via Kerberos : no additional environment variable, a token must exist on the system (kinit command)

Example:

Retrieving cernox_still and cernox_50k values from the lio.temperature index for the 01/09/2019, one value per hour.

from datetime import datetime
import ricochet.monitoring.ricq as ricq
try:
  df = ricq.DataFetcher()
  df.fetch(["ricochet-lio.temperature%cernox_still","ricochet-lio.temperature%cernox_50k"]
           ,datetime(2019,9,1), datetime(2019,9,2), "1h")
  d = df.get_data()
  print(d)
except RuntimeError as e:
  print(e)
except ValueError as e:
  print(e)
datetime  cernox_still  cernox_50k
0  2019-09-01 00:00:00      0.965215   61.086022
1  2019-09-01 01:00:00      0.964848   61.088353
2  2019-09-01 02:00:00      0.965159   61.103076
3  2019-09-01 03:00:00      0.965289   61.118557
4  2019-09-01 04:00:00      0.965729   61.132924
5  2019-09-01 05:00:00      0.965760   61.143401
6  2019-09-01 06:00:00      0.965483   61.152281
7  2019-09-01 07:00:00      0.965573   61.156753
8  2019-09-01 08:00:00      0.965965   61.157790
9  2019-09-01 09:00:00      0.966453   61.159242
10 2019-09-01 10:00:00      0.966025   61.118044
11 2019-09-01 11:00:00      0.965200   61.073017
12 2019-09-01 12:00:00      0.964738   61.068119
13 2019-09-01 13:00:00      0.964744   61.083502
14 2019-09-01 14:00:00      0.964968   61.106286
15 2019-09-01 15:00:00      0.965372   61.121716
16 2019-09-01 16:00:00      0.965846   61.135635
17 2019-09-01 17:00:00      0.965594   61.148763
18 2019-09-01 18:00:00      0.966031   61.160616
19 2019-09-01 19:00:00      0.965769   61.160062
20 2019-09-01 20:00:00      0.965228   61.129484
21 2019-09-01 21:00:00      0.965299   61.104936
22 2019-09-01 22:00:00      0.965078   61.102760
23 2019-09-01 23:00:00      0.965169   61.116333

You can do whatever you want with the retrieved data. You can also directly export the data into a CSV file :

df.export_to_csv("data.csv")

Or create a default plot with the values :

df.plot()

If you need a calendar binning (values per weeks, months, years, …), check out the “p_interval_type” parameter of the fetch method.

clear()

Clear the data cache

disconnect()

Disconnection of the underlying connection pool

empty()

Check if data are available

Returns:

Returns True if no data are available

export_to_csv(p_output_filename)

Export the data cache in a CSV file.

Parameters:

p_output_filename – The name of the output CSV file.

fetch(p_user_fields, p_start, p_stop, p_user_interval, p_interval_type='fixed')

Connect to the cluster and retrieve data.

Parameters:
  • p_user_fields – A list of strings containing the names of the fields to retrieve and their index with the format “index%field”. Index can be either “ricochet-lio.temperature” or “lio.cryo”.

  • p_start – A datetime.datetime date describing the start of the data period

  • p_stop – A datetime.datetime date describing the end of the data period

  • p_user_interval – The binning of the data : seconds (s), minutes (m), hours (h), days (d), as in “30s”, “15m”, “30d”… The function will return an averaged value on the given interval.

  • p_interval_type – Type of time interval used to average data. Can be “fixed” in standard units (15m, 2h, 6d, …) or “calendar” (minute, hour, day, week, month, year)

Raises:

ValueError – Raised if the format of the input date parameters is not known.

get_data()

Returns the data cache

Returns:

Returns a pandas.DataFrame object containing the retrieved data.

get_data_period()

Returns the starting and ending dates of the data cache

Returns:

Returns two Datetime.Datetime objects containing the start and end dates, None if not available (no data cached).

plot()

Create a default plot of the cached data.

class ricochet.monitoring.ricq.ELSConnection

Bases: object

Connection to an ElasticSearch instance. This is a singleton class, the handler should always be retrieved through the get_handler() method.

Retrieving a handler to the elasticsearch instance:

from ricochet.monitoring.ricq import ELSConnection
h = ELSConnection.get_handler()

This can be used from anywhere in the code.

classmethod disconnect()
classmethod get_handler()

Class Method. Retrieve the handler on the elasticsearch connexion.

classmethod get_instance()

Class Method. Retrieve the singleton instance of the class.

instance = None
class ricochet.monitoring.ricq.ExistsConstraint(field)

Bases: Constraint

class ricochet.monitoring.ricq.MatchAllConstraint

Bases: Constraint

class ricochet.monitoring.ricq.MatchConstraint(field, value)

Bases: Constraint

class ricochet.monitoring.ricq.MultiQuery(join_operator='and', *args)

Bases: Query

class ricochet.monitoring.ricq.NetstedQuery(list_name, q)

Bases: Query

class ricochet.monitoring.ricq.Query

Bases: ABC

Class allowing to build queries and send them to the ElasticSearch REST API.

This is an abstract class, it can be used throught its inheriting classes Contraint, MultiQuery and NestedQuery.

A constraint is a single instruction to select documents. There are 4 types of constraints :

  • MatchAllConstraint : takes everything

  • ExistsConstraint : check that a specific field exists in the document (and is not null!)

  • MatchConstraint : check that a specific field has a specific value in the document

  • RangeConstraint : check that a specific field’s value is located between 2 given limits

A first example with a query returning all documents in the ricochet-lio_db.stream_info index :

import ricochet.monitoring.ricq as ricq
all = ricq.MatchAllConstraint()
r = all.execute("ricochet-lio_db.stream_info", nb_records=2000)

r is now an array containing all returned documents. By default, the execute function will only return the 10 first results. You can increase this number with the nb_records parameter. You can access each field of each result directly :

r[0].name

Or you can have access to the whole document :

r[0].to_dict()

We can search for documents where the start_date field is present and not null :

import ricochet.monitoring.ricq as ricq
with_date = ricq.ExistsConstraint("start_date")
r = with_date.execute("ricochet-lio_db.stream_info", nb_records=2000)

Or retrieve the data about the specific stream th28l002 :

import ricochet.monitoring.ricq as ricq
s = ricq.MatchConstraint("name", "th28l002")
r = s.execute("ricochet-lio_db.stream_info")

Select all streams started in January 2021 :

import ricochet.monitoring.ricq as ricq
january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31")
r = january.execute("ricochet-lio_db.stream_info", nb_records=500)

If you want to build more complex queries, you will need to use several constraints. You can group constraints by using the MultiQuery class, providing the operator to use : and, or, and not, or not. Let’s find the streams from January 2021 where the temperature is between 0.01 and 0.015 :

import ricochet.monitoring.ricq as ricq
january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31")
temp = ricq.RangeConstraint("temperature",0.01,0.015)
q = ricq.MultiQuery("and", temp, january)
r = q.execute("ricochet-lio_db.stream_info", nb_records=500)

You can use a MultiQuery in a MultiQuery. If we want streams started in January 2021 with a temperature between 0.01 and 0.015 OR any stream started in Decembre 2020 :

import ricochet.monitoring.ricq as ricq
january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31")
temp = ricq.RangeConstraint("temperature",0.01,0.015)
q = ricq.MultiQuery("and", temp, january)
decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31")
q2 = ricq.MultiQuery("or", q, decembre)
r = q2.execute("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])

You can use the sorting_fields of the execute function to give a list of fields to use to sort the results.

Things are a bit different if you want to put constraints on a list field (for example channel_configs). You will want to select documents (streams) on values that are possibly contained in the list : this is a Nested Query. For example, we want streams which include data for channel ‘chala’ of detector ‘SEL10’ :

import ricochet.monitoring.ricq as ricq
sel10 = ricq.MatchConstraint("channel_configs.det_name", "SEL10") #Constraint on the detector name
chala = ricq.MatchConstraint("channel_configs.chan_name", "chala") #Constraint on the channel name
select = ricq.MultiQuery("and",sel10,chala) # We want detector name AND channel name
q = ricq.NetstedQuery("channel_configs", select) # This is a nested query on the channel_configs list
r = q.execute("ricochet-lio_db.stream_info", nb_records=500)

You can get the CURL format of your query by using the print_curl() method which takes the same arguments as the execute() method:

import ricochet.monitoring.ricq as ricq
january = ricq.RangeConstraint("start_date", "2021-01-01", "2021-01-31")
temp = ricq.RangeConstraint("temperature",0.01,0.015)
q = ricq.MultiQuery("and", temp, january)
decembre = ricq.RangeConstraint("start_date", "2020-12-01", "2020-12-31")
q2 = ricq.MultiQuery("or", q, decembre)
r = q2.print_curl("ricochet-lio_db.stream_info", nb_records=500, sorting_fields=["start_date"])
execute(index, nb_records=10, sorting_fields=[], sorting_order=1)

Send the query to ELS and fetch the results

Parameters:
  • index – Name of the elasticsearch index to query.

  • nb_records – Maximum number of returned documents. Default is 10.

  • sorting_fields – List of fields to use to sort results. Default is empty (no sorting).

  • sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.

Returns:

Returns a list of answers.

print_curl(index, nb_records=10, sorting_fields=[], sorting_order=1)

Build and print the query formated for Curl

Parameters:
  • index – Name of the elasticsearch index to query.

  • nb_records – Maximum number of returned documents. Default is 10.

  • sorting_fields – List of fields to use to sort results. Default is empty (no sorting).

  • sorting_order – Order of the sorting. Can be <0 for DESC or >=0 for ASC. You can also use a list of number to sort each field in a specific order.

Returns:

Returns a list of answers.

class ricochet.monitoring.ricq.RangeConstraint(field, min_value, max_value)

Bases: Constraint