Coverage for watcher/decision_engine/datasources/grafana_translator/influxdb.py: 93%

38 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2019 European Organization for Nuclear Research (CERN) 

3# 

4# Authors: Corne Lukken <info@dantalion.nl> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18 

19from oslo_config import cfg 

20from oslo_log import log 

21from oslo_serialization import jsonutils 

22 

23from watcher.common import exception 

24from watcher.decision_engine.datasources.grafana_translator.base import \ 

25 BaseGrafanaTranslator 

26 

27CONF = cfg.CONF 

28LOG = log.getLogger(__name__) 

29 

30 

31class InfluxDBGrafanaTranslator(BaseGrafanaTranslator): 

32 """Grafana translator to communicate with InfluxDB database""" 

33 

34 NAME = 'influxdb' 

35 

36 def __init__(self, data): 

37 super(InfluxDBGrafanaTranslator, self).__init__(data) 

38 

39 def build_params(self): 

40 """""" 

41 

42 data = self._data 

43 

44 retention_period = None 

45 available_periods = CONF.grafana_translators.retention_periods.items() 

46 for key, value in sorted(available_periods, key=lambda x: x[1]): 

47 if int(data['period']) < int(value): 

48 retention_period = key 

49 break 

50 

51 if retention_period is None: 

52 retention_period = max(available_periods)[0] 

53 LOG.warning("Longest retention period is to short for desired" 

54 " period") 

55 

56 try: 

57 resource = self._extract_attribute( 

58 data['resource'], data['attribute']) 

59 except AttributeError: 

60 LOG.error("Resource: %s does not contain attribute %s", 

61 data['resource'], data['attribute']) 

62 raise 

63 

64 # Granularity is optional if it is None the minimal value for InfluxDB 

65 # will be 1 

66 granularity = \ 

67 data['granularity'] if data['granularity'] is not None else 1 

68 

69 return {'db': data['db'], 

70 'epoch': 'ms', 

71 'q': self._query_format( 

72 data['query'], data['aggregate'], resource, data['period'], 

73 granularity, retention_period)} 

74 

75 def extract_result(self, raw_results): 

76 """""" 

77 try: 

78 # For result structure see: 

79 # https://docs.openstack.org/watcher/latest/datasources/grafana.html#InfluxDB 

80 result = jsonutils.loads(raw_results) 

81 result = result['results'][0]['series'][0] 

82 index_aggregate = result['columns'].index(self._data['aggregate']) 

83 return result['values'][0][index_aggregate] 

84 except KeyError: 

85 LOG.error("Could not extract %s for the resource: %s", 

86 self._data['metric'], self._data['resource']) 

87 raise exception.NoSuchMetricForHost( 

88 metric=self._data['metric'], host=self._data['resource'])