Coverage for watcher/decision_engine/datasources/prometheus.py: 92%

173 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# Copyright 2024 Red Hat, Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14# 

15from observabilityclient import prometheus_client 

16from oslo_config import cfg 

17from oslo_log import log 

18import re 

19from watcher._i18n import _ 

20from watcher.common import exception 

21from watcher.decision_engine.datasources import base 

22 

23CONF = cfg.CONF 

24LOG = log.getLogger(__name__) 

25 

26 

27class PrometheusHelper(base.DataSourceBase): 

28 """PrometheusHelper class for retrieving metrics from Prometheus server 

29 

30 This class implements the DataSourceBase to allow Watcher to query 

31 Prometheus as a data source for metrics. 

32 """ 

33 

34 NAME = 'prometheus' 

35 METRIC_MAP = dict(host_cpu_usage='node_cpu_seconds_total', 

36 host_ram_usage='node_memory_MemAvailable_bytes', 

37 host_outlet_temp=None, 

38 host_inlet_temp=None, 

39 host_airflow=None, 

40 host_power=None, 

41 instance_cpu_usage='ceilometer_cpu', 

42 instance_ram_usage='ceilometer_memory_usage', 

43 instance_ram_allocated='instance.memory', 

44 instance_l3_cache_usage=None, 

45 instance_root_disk_size='instance.disk', 

46 ) 

47 AGGREGATES_MAP = dict(mean='avg', max='max', min='min', count='avg') 

48 

49 def __init__(self): 

50 """Initialise the PrometheusHelper 

51 

52 The prometheus helper uses the PrometheusAPIClient provided by 

53 python-observabilityclient. 

54 The prometheus_fqdn_labels contains a list the values contained in 

55 the fqdn_label in the Prometheus instance. When making queries to 

56 Prometheus we use the fqdn_label to specify the node for which 

57 metrics are to be retrieved. 

58 host, port and fqdn_label come from watcher_client 

59 config. The prometheus_fqdn_label allows override of the required label 

60 in Prometheus scrape configs that specifies each target's fqdn. 

61 """ 

62 self.prometheus = self._setup_prometheus_client() 

63 self.prometheus_fqdn_label = ( 

64 CONF.prometheus_client.fqdn_label 

65 ) 

66 self.prometheus_fqdn_labels = ( 

67 self._build_prometheus_fqdn_labels() 

68 ) 

69 self.prometheus_host_instance_map = ( 

70 self._build_prometheus_host_instance_map() 

71 ) 

72 

73 def _setup_prometheus_client(self): 

74 """Initialise the prometheus client with config options 

75 

76 Use the prometheus_client options in watcher.conf to setup 

77 the PrometheusAPIClient client object and return it. 

78 :raises watcher.common.exception.MissingParameter if 

79 prometheus host or port is not set in the watcher.conf 

80 under the [prometheus_client] section. 

81 :raises watcher.common.exception.InvalidParameter if 

82 the prometheus host or port have invalid format. 

83 """ 

84 def _validate_host_port(host, port): 

85 if len(host) > 255: 

86 return (False, "hostname is too long: '%s'" % host) 

87 if host[-1] == '.': 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 host = host[:-1] 

89 legal_hostname = re.compile( 

90 "(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE) 

91 if not all(legal_hostname.match(host_part) 

92 for host_part in host.split(".")): 

93 return (False, "hostname '%s' failed regex match " % host) 

94 try: 

95 assert bool(1 <= int(port) <= 65535) 

96 except (AssertionError, ValueError): 

97 return (False, "missing or invalid port number '%s' " 

98 % port) 

99 return (True, "all good") 

100 

101 _host = CONF.prometheus_client.host 

102 _port = CONF.prometheus_client.port 

103 if (not _host or not _port): 

104 raise exception.MissingParameter( 

105 message=(_( 

106 "prometheus host and port must be set in watcher.conf " 

107 "under the [prometheus_client] section. Can't initialise " 

108 "the datasource without valid host and port.")) 

109 ) 

110 validated, reason = _validate_host_port(_host, _port) 

111 if (not validated): 

112 raise exception.InvalidParameter( 

113 message=(_( 

114 "A valid prometheus host and port are required. The #" 

115 "values found in watcher.conf are '%(host)s' '%(port)s'. " 

116 "This fails validation for the following reason: " 

117 "%(reason)s.") 

118 % {'host': _host, 'port': _port, 'reason': reason}) 

119 ) 

120 the_client = prometheus_client.PrometheusAPIClient( 

121 "%s:%s" % (_host, _port)) 

122 

123 # check if tls options or basic_auth options are set and use them 

124 prometheus_user = CONF.prometheus_client.username 

125 prometheus_pass = CONF.prometheus_client.password 

126 prometheus_ca_cert = CONF.prometheus_client.cafile 

127 prometheus_client_cert = CONF.prometheus_client.certfile 

128 prometheus_client_key = CONF.prometheus_client.keyfile 

129 if (prometheus_ca_cert): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 the_client.set_ca_cert(prometheus_ca_cert) 

131 if (prometheus_client_cert and prometheus_client_key): 

132 the_client.set_client_cert( 

133 prometheus_client_cert, prometheus_client_key) 

134 if (prometheus_user and prometheus_pass): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 the_client.set_basic_auth(prometheus_user, prometheus_pass) 

136 

137 return the_client 

138 

139 def _build_prometheus_fqdn_labels(self): 

140 """Build the list of fqdn_label values to be used in host queries 

141 

142 Watcher knows nodes by their hostname. In Prometheus however the 

143 scrape targets (also known as 'instances') are specified by I.P. 

144 (or hostname) and port number and fqdn is stored in a custom 'fqdn' 

145 label added to Prometheus scrape_configs. Operators can use a 

146 different custom label instead by setting the prometheus_fqdn_label 

147 config option under the prometheus_client section of watcher config. 

148 The built prometheus_fqdn_labels is created with the full list 

149 of values of the prometheus_fqdn_label label in Prometheus. This will 

150 be used to create a map of hostname<-->fqdn and to identify if a target 

151 exist in prometheus for the compute nodes before sending the query. 

152 :return a set of values of the fqdn label. For example: 

153 {'foo.example.com', 'bar.example.com'} 

154 {'foo', 'bar'} 

155 """ 

156 prometheus_targets = self.prometheus._get( 

157 "targets?state=active")['data']['activeTargets'] 

158 # >>> prometheus_targets[0]['labels'] 

159 # {'fqdn': 'marios-env-again.controlplane.domain', 

160 # 'instance': 'localhost:9100', 'job': 'node'} 

161 fqdn_instance_labels = set() 

162 for target in prometheus_targets: 

163 if target.get('labels', {}).get(self.prometheus_fqdn_label): 

164 fqdn_instance_labels.add( 

165 target['labels'].get(self.prometheus_fqdn_label)) 

166 

167 if not fqdn_instance_labels: 

168 LOG.error( 

169 "Could not create fqdn labels list from Prometheus " 

170 "targets config. Prometheus returned the following: %s", 

171 prometheus_targets 

172 ) 

173 return set() 

174 return fqdn_instance_labels 

175 

176 def _build_prometheus_host_instance_map(self): 

177 """Build the hostname<-->instance_label mapping needed for queries 

178 

179 The prometheus_fqdn_labels has the fully qualified domain name 

180 for hosts. This will create a duplicate map containing only the host 

181 name part. Depending on the watcher node.hostname either the 

182 fqdn_instance_labels or the host_instance_map will be used to resolve 

183 the correct prometheus fqdn_label for queries. In the event the 

184 fqdn_instance_labels elements are not valid fqdn (for example it has 

185 hostnames, not fqdn) the host_instance_map cannot be created and 

186 an empty dictionary is returned with a warning logged. 

187 :return a dict mapping hostname to instance label. For example: 

188 {'foo': 'foo.example.com', 'bar': 'bar.example.com'} 

189 """ 

190 if not self.prometheus_fqdn_labels: 

191 LOG.error("Cannot build host_instance_map without " 

192 "fqdn_instance_labels") 

193 return {} 

194 host_instance_map = { 

195 host: fqdn for (host, fqdn) in ( 

196 (fqdn.split('.')[0], fqdn) 

197 for fqdn in self.prometheus_fqdn_labels 

198 if '.' in fqdn 

199 ) 

200 } 

201 if not host_instance_map: 

202 LOG.warning("Creating empty host instance map. Are the keys " 

203 "in prometheus_fqdn_labels valid fqdn?") 

204 return {} 

205 return host_instance_map 

206 

207 def _resolve_prometheus_instance_label(self, node_name): 

208 """Resolve the prometheus instance label to use in queries 

209 

210 Given the watcher node.hostname, resolve the prometheus instance 

211 label for use in queries, first trying the fqdn_instance_labels and 

212 then the host_instance_map (watcher.node_name can be fqdn or hostname). 

213 If the name is not resolved after the first attempt, rebuild the fqdn 

214 and host instance maps and try again. This allows for new hosts added 

215 after the initialisation of the fqdn_instance_labels. 

216 :param node_name: the watcher node.hostname 

217 :return String for the prometheus instance label and None if not found 

218 """ 

219 def _query_maps(node): 

220 if node in self.prometheus_fqdn_labels: 

221 return node 

222 else: 

223 return self.prometheus_host_instance_map.get(node, None) 

224 

225 instance_label = _query_maps(node_name) 

226 # refresh the fqdn and host instance maps and retry 

227 if not instance_label: 

228 self.prometheus_fqdn_labels = ( 

229 self._build_prometheus_fqdn_labels() 

230 ) 

231 self.prometheus_host_instance_map = ( 

232 self._build_prometheus_host_instance_map() 

233 ) 

234 instance_label = _query_maps(node_name) 

235 

236 if not instance_label: 

237 LOG.error("Cannot query prometheus without instance label. " 

238 "Could not resolve %s", node_name) 

239 return None 

240 return instance_label 

241 

242 def _resolve_prometheus_aggregate(self, watcher_aggregate, meter): 

243 """Resolve the prometheus aggregate using self.AGGREGATES_MAP 

244 

245 This uses the AGGREGATES_MAP to resolve the correct prometheus 

246 aggregate to use in queries, from the given watcher aggregate 

247 """ 

248 if watcher_aggregate == 'count': 

249 LOG.warning('Prometheus data source does not currently support ' 

250 ' the count aggregate. Proceeding with mean (avg).') 

251 promql_aggregate = self.AGGREGATES_MAP.get(watcher_aggregate) 

252 if not promql_aggregate: 

253 raise exception.InvalidParameter( 

254 message=(_("Unknown Watcher aggregate %s. This does not " 

255 "resolve to any valid prometheus query aggregate.") 

256 % watcher_aggregate) 

257 ) 

258 return promql_aggregate 

259 

260 def _build_prometheus_query(self, aggregate, meter, instance_label, 

261 period, resource=None): 

262 """Build and return the prometheus query string with the given args 

263 

264 This function builds and returns the string query that will be sent 

265 to the Prometheus server /query endpoint. For host cpu usage we use: 

266 

267 100 - (avg by (fqdn)(rate(node_cpu_seconds_total{mode='idle', 

268 fqdn='some_host'}[300s])) * 100) 

269 

270 so using prometheus rate function over the specified period, we average 

271 per instance (all cpus) idle time and then 'everything else' is cpu 

272 usage time. 

273 

274 For host memory usage we use: 

275 

276 (node_memory_MemTotal_bytes{instance='the_host'} - 

277 avg_over_time( 

278 node_memory_MemAvailable_bytes{instance='the_host'}[300s])) 

279 / 1024 / 1024 

280 

281 So we take total and subtract available memory to determine 

282 how much is in use. We use the prometheus xxx_over_time functions 

283 avg/max/min depending on the aggregate with the specified time period. 

284 

285 :param aggregate: one of the values of self.AGGREGATES_MAP 

286 :param meter: the name of the Prometheus meter to use 

287 :param instance_label: the Prometheus instance label (scrape target). 

288 :param period: the period in seconds for which to query 

289 :param resource: the resource object for which metrics are requested 

290 :return: a String containing the Prometheus query 

291 :raises watcher.common.exception.InvalidParameter if params are None 

292 :raises watcher.common.exception.InvalidParameter if meter is not 

293 known or currently supported (prometheus meter name). 

294 """ 

295 query_args = None 

296 uuid_label_key = CONF.prometheus_client.instance_uuid_label 

297 if (meter is None or aggregate is None or instance_label is None or 

298 period is None): 

299 raise exception.InvalidParameter( 

300 message=(_( 

301 "Cannot build prometheus query without args. " 

302 "You provided: meter %(mtr)s, aggregate %(agg)s, " 

303 "instance_label %(inst)s, period %(prd)s") 

304 % {'mtr': meter, 'agg': aggregate, 

305 'inst': instance_label, 'prd': period}) 

306 ) 

307 

308 if meter == 'node_cpu_seconds_total': 

309 query_args = ( 

310 "100 - (%(agg)s by (%(label)s)(rate(%(meter)s" 

311 "{mode='idle',%(label)s='%(label_value)s'}[%(period)ss])) " 

312 "* 100)" 

313 % {'label': self.prometheus_fqdn_label, 

314 'label_value': instance_label, 'agg': aggregate, 

315 'meter': meter, 'period': period} 

316 ) 

317 elif meter == 'node_memory_MemAvailable_bytes': 

318 query_args = ( 

319 "(node_memory_MemTotal_bytes{%(label)s='%(label_value)s'} " 

320 "- %(agg)s_over_time(%(meter)s{%(label)s='%(label_value)s'}" 

321 "[%(period)ss])) / 1024 / 1024" 

322 % {'label': self.prometheus_fqdn_label, 

323 'label_value': instance_label, 'agg': aggregate, 

324 'meter': meter, 'period': period} 

325 ) 

326 elif meter == 'ceilometer_memory_usage': 

327 query_args = ( 

328 "%s_over_time(%s{%s='%s'}[%ss])" % 

329 (aggregate, meter, uuid_label_key, instance_label, period) 

330 ) 

331 elif meter == 'ceilometer_cpu': 

332 # We are converting the total cumulative cpu time (ns) to cpu usage 

333 # percentage so we need to divide between the number of vcpus. 

334 # As this is a percentage metric, we set a max level of 100. It has 

335 # been observed in very high usage cases, prometheus reporting 

336 # values higher that 100 what can lead to unexpected behaviors. 

337 vcpus = resource.vcpus 

338 if not vcpus: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 LOG.warning( 

340 "instance vcpu count not set for instance %s, assuming 1", 

341 instance_label 

342 ) 

343 vcpus = 1 

344 query_args = ( 

345 "clamp_max((%s by (instance)(rate(%s{%s='%s'}[%ss]))/10e+8) " 

346 "*(100/%s), 100)" % 

347 (aggregate, meter, uuid_label_key, instance_label, period, 

348 vcpus) 

349 ) 

350 else: 

351 raise exception.InvalidParameter( 

352 message=(_("Cannot process prometheus meter %s") % meter) 

353 ) 

354 

355 return query_args 

356 

357 def check_availability(self): 

358 """check if Prometheus server is available for queries 

359 

360 Performs HTTP get on the prometheus API /status/runtimeinfo endpoint. 

361 The prometheus_client will raise a PrometheuAPIClientError if the 

362 call is unsuccessful, which is caught here and a warning logged. 

363 """ 

364 try: 

365 self.prometheus._get("status/runtimeinfo") 

366 except prometheus_client.PrometheusAPIClientError: 

367 LOG.warning( 

368 "check_availability raised PrometheusAPIClientError. " 

369 "Is Prometheus server down?" 

370 ) 

371 return 'not available' 

372 return 'available' 

373 

374 def list_metrics(self): 

375 """Fetch all prometheus metrics from api/v1/label/__name__/values 

376 

377 The prometheus_client will raise a PrometheuAPIClientError if the 

378 call is unsuccessful, which is caught here and a warning logged. 

379 """ 

380 try: 

381 response = self.prometheus._get("label/__name__/values") 

382 except prometheus_client.PrometheusAPIClientError: 

383 LOG.warning( 

384 "list_metrics raised PrometheusAPIClientError. Is Prometheus" 

385 "server down?" 

386 ) 

387 return set() 

388 return set(response['data']) 

389 

390 def statistic_aggregation(self, resource=None, resource_type=None, 

391 meter_name=None, period=300, aggregate='mean', 

392 granularity=300): 

393 

394 meter = self._get_meter(meter_name) 

395 query_args = '' 

396 instance_label = '' 

397 

398 # For instance resource type, the datasource expects the uuid of the 

399 # instance to be assigned to a label in the prometheus metrics, with a 

400 # specific key value. 

401 if resource_type == 'compute_node': 

402 instance_label = self._resolve_prometheus_instance_label( 

403 resource.hostname) 

404 elif resource_type == 'instance': 404 ↛ 414line 404 didn't jump to line 414 because the condition on line 404 was always true

405 instance_label = resource.uuid 

406 # For ram_allocated and root_disk size metrics there are no valid 

407 # values in the prometheus backend store. We rely in the values 

408 # provided in the vms inventory. 

409 if meter == 'instance.memory': 

410 return float(resource.memory) 

411 elif meter == 'instance.disk': 

412 return float(resource.disk) 

413 else: 

414 LOG.warning( 

415 "Prometheus data source does not currently support " 

416 "resource_type %s", resource_type 

417 ) 

418 return None 

419 

420 promql_aggregate = self._resolve_prometheus_aggregate(aggregate, meter) 

421 query_args = self._build_prometheus_query( 

422 promql_aggregate, meter, instance_label, period, resource 

423 ) 

424 if not query_args: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 LOG.error("Cannot proceed without valid prometheus query") 

426 return None 

427 

428 result = self.query_retry( 

429 self.prometheus.query, query_args, 

430 ignored_exc=prometheus_client.PrometheusAPIClientError, 

431 ) 

432 

433 return float(result[0].value) if result else None 

434 

435 def statistic_series(self, resource=None, resource_type=None, 

436 meter_name=None, start_time=None, end_time=None, 

437 granularity=300): 

438 raise NotImplementedError( 

439 _('Prometheus helper currently does not support statistic_series. ' 

440 'This can be considered for future enhancement.')) 

441 

442 def _invert_max_min_aggregate(self, agg): 

443 """Invert max and min for node/host metric queries from node-exporter 

444 

445 because we query for 'idle'/'unused' cpu and memory. 

446 For Watcher 'max cpu used' we query for prometheus 'min idle time'. 

447 For Watcher 'max memory used' we retrieve min 'unused'/'available' 

448 memory from Prometheus. This internal function is used exclusively 

449 by get_host_cpu_usage and get_host_ram_usage. 

450 :param agg: the metric collection aggregate 

451 :return: a String aggregate 

452 

453 """ 

454 if agg == 'max': 

455 return 'min' 

456 elif agg == 'min': 

457 return 'max' 

458 return agg 

459 

460 def get_host_cpu_usage(self, resource, period=300, 

461 aggregate="mean", granularity=None): 

462 """Query prometheus for node_cpu_seconds_total 

463 

464 This calculates the host cpu usage and returns it as a percentage 

465 The calculation is made by using the cpu 'idle' time, per 

466 instance (so all CPUs are included). For example the query looks like 

467 (100 - (avg by (fqdn)(rate(node_cpu_seconds_total 

468 {mode='idle',fqdn='compute1.example.com'}[300s])) * 100)) 

469 """ 

470 aggregate = self._invert_max_min_aggregate(aggregate) 

471 cpu_usage = self.statistic_aggregation( 

472 resource, 'compute_node', 

473 'host_cpu_usage', period=period, 

474 granularity=granularity, aggregate=aggregate) 

475 return float(cpu_usage) if cpu_usage else None 

476 

477 def get_host_ram_usage(self, resource, period=300, 

478 aggregate="mean", granularity=None): 

479 aggregate = self._invert_max_min_aggregate(aggregate) 

480 ram_usage = self.statistic_aggregation( 

481 resource, 'compute_node', 

482 'host_ram_usage', period=period, 

483 granularity=granularity, aggregate=aggregate) 

484 return float(ram_usage) if ram_usage else None 

485 

486 def get_instance_ram_usage(self, resource, period=300, 

487 aggregate="mean", granularity=None): 

488 ram_usage = self.statistic_aggregation( 

489 resource, 'instance', 

490 'instance_ram_usage', period=period, 

491 granularity=granularity, aggregate=aggregate) 

492 return ram_usage 

493 

494 def get_instance_cpu_usage(self, resource, period=300, 

495 aggregate="mean", granularity=None): 

496 cpu_usage = self.statistic_aggregation( 

497 resource, 'instance', 

498 'instance_cpu_usage', period=period, 

499 granularity=granularity, aggregate=aggregate) 

500 return cpu_usage 

501 

502 def get_instance_ram_allocated(self, resource, period=300, 

503 aggregate="mean", granularity=None): 

504 ram_allocated = self.statistic_aggregation( 

505 resource, 'instance', 

506 'instance_ram_allocated', period=period, 

507 granularity=granularity, aggregate=aggregate) 

508 return ram_allocated 

509 

510 def get_instance_root_disk_size(self, resource, period=300, 

511 aggregate="mean", granularity=None): 

512 root_disk_size = self.statistic_aggregation( 

513 resource, 'instance', 

514 'instance_root_disk_size', period=period, 

515 granularity=granularity, aggregate=aggregate) 

516 return root_disk_size