Coverage for watcher/decision_engine/datasources/grafana_translator/influxdb.py: 93%
38 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
3#
4# Authors: Corne Lukken <info@dantalion.nl>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
19from oslo_config import cfg
20from oslo_log import log
21from oslo_serialization import jsonutils
23from watcher.common import exception
24from watcher.decision_engine.datasources.grafana_translator.base import \
25 BaseGrafanaTranslator
27CONF = cfg.CONF
28LOG = log.getLogger(__name__)
31class InfluxDBGrafanaTranslator(BaseGrafanaTranslator):
32 """Grafana translator to communicate with InfluxDB database"""
34 NAME = 'influxdb'
36 def __init__(self, data):
37 super(InfluxDBGrafanaTranslator, self).__init__(data)
39 def build_params(self):
40 """"""
42 data = self._data
44 retention_period = None
45 available_periods = CONF.grafana_translators.retention_periods.items()
46 for key, value in sorted(available_periods, key=lambda x: x[1]):
47 if int(data['period']) < int(value):
48 retention_period = key
49 break
51 if retention_period is None:
52 retention_period = max(available_periods)[0]
53 LOG.warning("Longest retention period is to short for desired"
54 " period")
56 try:
57 resource = self._extract_attribute(
58 data['resource'], data['attribute'])
59 except AttributeError:
60 LOG.error("Resource: %s does not contain attribute %s",
61 data['resource'], data['attribute'])
62 raise
64 # Granularity is optional if it is None the minimal value for InfluxDB
65 # will be 1
66 granularity = \
67 data['granularity'] if data['granularity'] is not None else 1
69 return {'db': data['db'],
70 'epoch': 'ms',
71 'q': self._query_format(
72 data['query'], data['aggregate'], resource, data['period'],
73 granularity, retention_period)}
75 def extract_result(self, raw_results):
76 """"""
77 try:
78 # For result structure see:
79 # https://docs.openstack.org/watcher/latest/datasources/grafana.html#InfluxDB
80 result = jsonutils.loads(raw_results)
81 result = result['results'][0]['series'][0]
82 index_aggregate = result['columns'].index(self._data['aggregate'])
83 return result['values'][0][index_aggregate]
84 except KeyError:
85 LOG.error("Could not extract %s for the resource: %s",
86 self._data['metric'], self._data['resource'])
87 raise exception.NoSuchMetricForHost(
88 metric=self._data['metric'], host=self._data['resource'])