Coverage for watcher/decision_engine/datasources/grafana.py: 89%
117 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
3#
4# Authors: Corne Lukken <info@dantalion.nl>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
19from urllib import parse as urlparse
21from http import HTTPStatus
22from oslo_config import cfg
23from oslo_log import log
25from watcher._i18n import _
26from watcher.common import clients
27from watcher.common import exception
28from watcher.decision_engine.datasources import base
29from watcher.decision_engine.datasources.grafana_translator import influxdb
31import requests
33CONF = cfg.CONF
34LOG = log.getLogger(__name__)
37class GrafanaHelper(base.DataSourceBase):
39 NAME = 'grafana'
41 """METRIC_MAP is only available at runtime _build_metric_map"""
42 METRIC_MAP = dict()
44 """All available translators"""
45 TRANSLATOR_LIST = [
46 influxdb.InfluxDBGrafanaTranslator.NAME
47 ]
49 def __init__(self, osc=None):
50 """:param osc: an OpenStackClients instance"""
51 self.osc = osc if osc else clients.OpenStackClients()
52 self.nova = self.osc.nova()
53 self.configured = False
54 self._base_url = None
55 self._headers = None
56 self._setup()
58 def _setup(self):
59 """Configure grafana helper to perform requests"""
61 token = CONF.grafana_client.token
62 base_url = CONF.grafana_client.base_url
64 if not token:
65 LOG.critical("GrafanaHelper authentication token not configured")
66 return
67 self._headers = {"Authorization": "Bearer " + token,
68 "Content-Type": "Application/json"}
70 if not base_url:
71 LOG.critical("GrafanaHelper url not properly configured, "
72 "check base_url")
73 return
74 self._base_url = base_url
76 # Very basic url parsing
77 parse = urlparse.urlparse(self._base_url)
78 if parse.scheme == '' or parse.netloc == '' or parse.path == '': 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 LOG.critical("GrafanaHelper url not properly configured, "
80 "check base_url and project_id")
81 return
83 self._build_metric_map()
85 if len(self.METRIC_MAP) == 0: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 LOG.critical("GrafanaHelper not configured for any metrics")
88 self.configured = True
90 def _build_metric_map(self):
91 """Builds the metric map by reading config information"""
93 for key, value in CONF.grafana_client.database_map.items():
94 try:
95 project = CONF.grafana_client.project_id_map[key]
96 attribute = CONF.grafana_client.attribute_map[key]
97 translator = CONF.grafana_client.translator_map[key]
98 query = CONF.grafana_client.query_map[key]
99 if project is not None and \ 99 ↛ 93line 99 didn't jump to line 93 because the condition on line 99 was always true
100 value is not None and\
101 translator in self.TRANSLATOR_LIST and\
102 query is not None:
103 self.METRIC_MAP[key] = {
104 'db': value,
105 'project': project,
106 'attribute': attribute,
107 'translator': translator,
108 'query': query
109 }
110 except KeyError as e:
111 LOG.error(e)
113 def _build_translator_schema(self, metric, db, attribute, query, resource,
114 resource_type, period, aggregate,
115 granularity):
116 """Create dictionary to pass to grafana proxy translators"""
118 return {'metric': metric, 'db': db, 'attribute': attribute,
119 'query': query, 'resource': resource,
120 'resource_type': resource_type, 'period': period,
121 'aggregate': aggregate, 'granularity': granularity}
123 def _get_translator(self, name, data):
124 """Use the names of translators to get the translator for the metric"""
125 if name == influxdb.InfluxDBGrafanaTranslator.NAME: 125 ↛ 128line 125 didn't jump to line 128 because the condition on line 125 was always true
126 return influxdb.InfluxDBGrafanaTranslator(data)
127 else:
128 raise exception.InvalidParameter(
129 parameter='name', parameter_type='grafana translator')
131 def _request(self, params, project_id):
132 """Make the request to the endpoint to retrieve data
134 If the request fails, determines what error to raise.
135 """
137 if self.configured is False: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise exception.DataSourceNotAvailable(self.NAME)
140 resp = requests.get(self._base_url + str(project_id) + '/query',
141 params=params, headers=self._headers,
142 timeout=CONF.grafana_client.http_timeout)
143 if resp.status_code == HTTPStatus.OK: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 return resp
145 elif resp.status_code == HTTPStatus.BAD_REQUEST: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 LOG.error("Query for metric is invalid")
147 elif resp.status_code == HTTPStatus.UNAUTHORIZED: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 LOG.error("Authorization token is invalid")
149 raise exception.DataSourceNotAvailable(self.NAME)
151 def statistic_aggregation(self, resource=None, resource_type=None,
152 meter_name=None, period=300, aggregate='mean',
153 granularity=300):
154 """Get the value for the specific metric based on specified parameters
156 """
158 try:
159 self.METRIC_MAP[meter_name]
160 except KeyError:
161 LOG.error(
162 "Metric: %s does not appear in the current Grafana metric map",
163 meter_name)
164 raise exception.MetricNotAvailable(metric=meter_name)
166 db = self.METRIC_MAP[meter_name]['db']
167 project = self.METRIC_MAP[meter_name]['project']
168 attribute = self.METRIC_MAP[meter_name]['attribute']
169 translator_name = self.METRIC_MAP[meter_name]['translator']
170 query = self.METRIC_MAP[meter_name]['query']
172 data = self._build_translator_schema(
173 meter_name, db, attribute, query, resource, resource_type, period,
174 aggregate, granularity)
176 translator = self._get_translator(translator_name, data)
178 params = translator.build_params()
180 raw_kwargs = dict(
181 params=params,
182 project_id=project,
183 )
184 kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
186 resp = self.query_retry(self._request, **kwargs)
187 if not resp:
188 LOG.warning("Datasource %s is not available.", self.NAME)
189 return
191 result = translator.extract_result(resp.content)
193 return result
195 def statistic_series(self, resource=None, resource_type=None,
196 meter_name=None, start_time=None, end_time=None,
197 granularity=300):
198 raise NotImplementedError(
199 _('Grafana helper does not support statistic series method'))
201 def get_host_cpu_usage(self, resource, period=300,
202 aggregate="mean", granularity=None):
203 return self.statistic_aggregation(
204 resource, 'compute_node', 'host_cpu_usage', period, aggregate,
205 granularity)
207 def get_host_ram_usage(self, resource, period=300,
208 aggregate="mean", granularity=None):
209 return self.statistic_aggregation(
210 resource, 'compute_node', 'host_ram_usage', period, aggregate,
211 granularity)
213 def get_host_outlet_temp(self, resource, period=300,
214 aggregate="mean", granularity=None):
215 return self.statistic_aggregation(
216 resource, 'compute_node', 'host_outlet_temp', period, aggregate,
217 granularity)
219 def get_host_inlet_temp(self, resource, period=300,
220 aggregate="mean", granularity=None):
221 return self.statistic_aggregation(
222 resource, 'compute_node', 'host_inlet_temp', period, aggregate,
223 granularity)
225 def get_host_airflow(self, resource, period=300,
226 aggregate="mean", granularity=None):
227 return self.statistic_aggregation(
228 resource, 'compute_node', 'host_airflow', period, aggregate,
229 granularity)
231 def get_host_power(self, resource, period=300,
232 aggregate="mean", granularity=None):
233 return self.statistic_aggregation(
234 resource, 'compute_node', 'host_power', period, aggregate,
235 granularity)
237 def get_instance_cpu_usage(self, resource, period=300,
238 aggregate="mean", granularity=None):
239 return self.statistic_aggregation(
240 resource, 'instance', 'instance_cpu_usage', period, aggregate,
241 granularity)
243 def get_instance_ram_usage(self, resource, period=300,
244 aggregate="mean", granularity=None):
245 return self.statistic_aggregation(
246 resource, 'instance', 'instance_ram_usage', period, aggregate,
247 granularity)
249 def get_instance_ram_allocated(self, resource, period=300,
250 aggregate="mean", granularity=None):
251 return self.statistic_aggregation(
252 resource, 'instance', 'instance_ram_allocated', period, aggregate,
253 granularity)
255 def get_instance_l3_cache_usage(self, resource, period=300,
256 aggregate="mean", granularity=None):
257 return self.statistic_aggregation(
258 resource, 'instance', 'instance_l3_cache_usage', period, aggregate,
259 granularity)
261 def get_instance_root_disk_size(self, resource, period=300,
262 aggregate="mean", granularity=None):
263 return self.statistic_aggregation(
264 resource, 'instance', 'instance_root_disk_size', period, aggregate,
265 granularity)