Coverage for watcher/decision_engine/model/collector/nova.py: 87%

184 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 Intel Innovation and Research Ireland Ltd. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import os_resource_classes as orc 

17from oslo_log import log 

18 

19from futurist import waiters 

20 

21from watcher.common import nova_helper 

22from watcher.common import placement_helper 

23from watcher.decision_engine.model.collector import base 

24from watcher.decision_engine.model import element 

25from watcher.decision_engine.model import model_root 

26from watcher.decision_engine.model.notification import nova 

27from watcher.decision_engine.scope import compute as compute_scope 

28from watcher.decision_engine import threading 

29 

30LOG = log.getLogger(__name__) 

31 

32 

33class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): 

34 """Nova cluster data model collector 

35 

36 The Nova cluster data model collector creates an in-memory 

37 representation of the resources exposed by the compute service. 

38 """ 

39 

40 HOST_AGGREGATES = "#/items/properties/compute/host_aggregates/" 

41 SCHEMA = { 

42 "$schema": "http://json-schema.org/draft-04/schema#", 

43 "type": "array", 

44 "items": { 

45 "type": "object", 

46 "properties": { 

47 "host_aggregates": { 

48 "type": "array", 

49 "items": { 

50 "anyOf": [ 

51 {"$ref": HOST_AGGREGATES + "host_aggr_id"}, 

52 {"$ref": HOST_AGGREGATES + "name"}, 

53 ] 

54 } 

55 }, 

56 "availability_zones": { 

57 "type": "array", 

58 "items": { 

59 "type": "object", 

60 "properties": { 

61 "name": { 

62 "type": "string" 

63 } 

64 }, 

65 "additionalProperties": False 

66 } 

67 }, 

68 "exclude": { 

69 "type": "array", 

70 "items": { 

71 "type": "object", 

72 "properties": { 

73 "instances": { 

74 "type": "array", 

75 "items": { 

76 "type": "object", 

77 "properties": { 

78 "uuid": { 

79 "type": "string" 

80 } 

81 }, 

82 "additionalProperties": False 

83 } 

84 }, 

85 "compute_nodes": { 

86 "type": "array", 

87 "items": { 

88 "type": "object", 

89 "properties": { 

90 "name": { 

91 "type": "string" 

92 } 

93 }, 

94 "additionalProperties": False 

95 } 

96 }, 

97 "host_aggregates": { 

98 "type": "array", 

99 "items": { 

100 "anyOf": [ 

101 {"$ref": 

102 HOST_AGGREGATES + "host_aggr_id"}, 

103 {"$ref": HOST_AGGREGATES + "name"}, 

104 ] 

105 } 

106 }, 

107 "instance_metadata": { 

108 "type": "array", 

109 "items": { 

110 "type": "object" 

111 } 

112 }, 

113 "projects": { 

114 "type": "array", 

115 "items": { 

116 "type": "object", 

117 "properties": { 

118 "uuid": { 

119 "type": "string" 

120 } 

121 }, 

122 "additionalProperties": False 

123 } 

124 } 

125 }, 

126 "additionalProperties": False 

127 } 

128 } 

129 }, 

130 "additionalProperties": False 

131 }, 

132 "host_aggregates": { 

133 "host_aggr_id": { 

134 "properties": { 

135 "id": { 

136 "oneOf": [ 

137 {"type": "integer"}, 

138 {"enum": ["*"]} 

139 ] 

140 } 

141 }, 

142 "additionalProperties": False 

143 }, 

144 "name": { 

145 "properties": { 

146 "name": { 

147 "type": "string" 

148 } 

149 }, 

150 "additionalProperties": False 

151 } 

152 }, 

153 "additionalProperties": False 

154 } 

155 

156 def __init__(self, config, osc=None): 

157 super(NovaClusterDataModelCollector, self).__init__(config, osc) 

158 

159 @property 

160 def notification_endpoints(self): 

161 """Associated notification endpoints 

162 

163 :return: Associated notification endpoints 

164 :rtype: List of :py:class:`~.EventsNotificationEndpoint` instances 

165 """ 

166 return [ 

167 nova.VersionedNotification(self), 

168 ] 

169 

170 def get_audit_scope_handler(self, audit_scope): 

171 self._audit_scope_handler = compute_scope.ComputeScope( 

172 audit_scope, self.config) 

173 if self._data_model_scope is None or ( 173 ↛ 178line 173 didn't jump to line 178 because the condition on line 173 was always true

174 len(self._data_model_scope) > 0 and ( 

175 self._data_model_scope != audit_scope)): 

176 self._data_model_scope = audit_scope 

177 self._cluster_data_model = None 

178 LOG.debug("audit scope %s", audit_scope) 

179 return self._audit_scope_handler 

180 

181 def execute(self): 

182 """Build the compute cluster data model""" 

183 LOG.debug("Building latest Nova cluster data model") 

184 

185 if self._audit_scope_handler is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 LOG.debug("No audit, Don't Build compute data model") 

187 return 

188 if self._data_model_scope is None: 

189 LOG.debug("No audit scope, Don't Build compute data model") 

190 return 

191 

192 builder = NovaModelBuilder(self.osc) 

193 return builder.execute(self._data_model_scope) 

194 

195 

196class NovaModelBuilder(base.BaseModelBuilder): 

197 """Build the graph-based model 

198 

199 This model builder adds the following data" 

200 

201 - Compute-related knowledge (Nova) 

202 - TODO(v-francoise): Network-related knowledge (Neutron) 

203 

204 NOTE(v-francoise): This model builder is meant to be extended in the future 

205 to also include both storage and network information respectively coming 

206 from Cinder and Neutron. Some prelimary work has been done in this 

207 direction in https://review.opendev.org/#/c/362730 but since we cannot 

208 guarantee a sufficient level of consistency for neither the storage nor the 

209 network part before the end of the Ocata cycle, this work has been 

210 re-scheduled for Pike. In the meantime, all the associated code has been 

211 commented out. 

212 """ 

213 

214 def __init__(self, osc): 

215 self.osc = osc 

216 self.model = None 

217 self.model_scope = dict() 

218 self.no_model_scope_flag = False 

219 self.nova = osc.nova() 

220 self.nova_helper = nova_helper.NovaHelper(osc=self.osc) 

221 self.placement_helper = placement_helper.PlacementHelper(osc=self.osc) 

222 self.executor = threading.DecisionEngineThreadPool() 

223 

224 def _collect_aggregates(self, host_aggregates, _nodes): 

225 if not host_aggregates: 

226 return 

227 

228 aggregate_list = self.call_retry(f=self.nova_helper.get_aggregate_list) 

229 aggregate_ids = [aggregate['id'] for aggregate 

230 in host_aggregates if 'id' in aggregate] 

231 aggregate_names = [aggregate['name'] for aggregate 

232 in host_aggregates if 'name' in aggregate] 

233 include_all_nodes = any('*' in field 

234 for field in (aggregate_ids, aggregate_names)) 

235 

236 for aggregate in aggregate_list: 

237 if (aggregate.id in aggregate_ids or 

238 aggregate.name in aggregate_names or 

239 include_all_nodes): 

240 _nodes.update(aggregate.hosts) 

241 

242 def _collect_zones(self, availability_zones, _nodes): 

243 if not availability_zones: 

244 return 

245 

246 service_list = self.call_retry(f=self.nova_helper.get_service_list) 

247 zone_names = [zone['name'] for zone 

248 in availability_zones] 

249 include_all_nodes = False 

250 if '*' in zone_names: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 include_all_nodes = True 

252 for service in service_list: 

253 if service.zone in zone_names or include_all_nodes: 

254 _nodes.add(service.host) 

255 

256 def _compute_node_future(self, future, future_instances): 

257 """Add compute node information to model and schedule instance info job 

258 

259 :param future: The future from the finished execution 

260 :rtype future: :py:class:`futurist.GreenFuture` 

261 :param future_instances: list of futures for instance jobs 

262 :rtype future_instances: list :py:class:`futurist.GreenFuture` 

263 """ 

264 try: 

265 node_info = future.result()[0] 

266 

267 # filter out baremetal node 

268 if node_info.hypervisor_type == 'ironic': 

269 LOG.debug("filtering out baremetal node: %s", node_info) 

270 return 

271 self.add_compute_node(node_info) 

272 # node.servers is a list of server objects 

273 # New in nova version 2.53 

274 instances = getattr(node_info, "servers", None) 

275 # Do not submit job if there are no instances on compute node 

276 if instances is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 LOG.info("No instances on compute_node: %s", node_info) 

278 return 

279 future_instances.append( 

280 self.executor.submit( 

281 self.add_instance_node, node_info, instances) 

282 ) 

283 except Exception: 

284 LOG.error("compute node from aggregate / " 

285 "availability_zone could not be found") 

286 

287 def _add_physical_layer(self): 

288 """Collects all information on compute nodes and instances 

289 

290 Will collect all required compute node and instance information based 

291 on the host aggregates and availability zones. If aggregates and zones 

292 do not specify any compute nodes all nodes are retrieved instead. 

293 

294 The collection of information happens concurrently using the 

295 DecisionEngineThreadpool. The collection is parallelized in three steps 

296 first information about aggregates and zones is gathered. Secondly, 

297 for each of the compute nodes a tasks is submitted to get detailed 

298 information about the compute node. Finally, Each of these submitted 

299 tasks will submit an additional task if the compute node contains 

300 instances. Before returning from this function all instance tasks are 

301 waited upon to complete. 

302 """ 

303 

304 compute_nodes = set() 

305 host_aggregates = self.model_scope.get("host_aggregates") 

306 availability_zones = self.model_scope.get("availability_zones") 

307 

308 """Submit tasks to gather compute nodes from availability zones and 

309 host aggregates. Each task adds compute nodes to the set, this set is 

310 threadsafe under the assumption that CPython is used with the GIL 

311 enabled.""" 

312 zone_aggregate_futures = { 

313 self.executor.submit( 

314 self._collect_aggregates, host_aggregates, compute_nodes), 

315 self.executor.submit( 

316 self._collect_zones, availability_zones, compute_nodes) 

317 } 

318 waiters.wait_for_all(zone_aggregate_futures) 

319 

320 # if zones and aggregates did not contain any nodes get every node. 

321 if not compute_nodes: 

322 self.no_model_scope_flag = True 

323 all_nodes = self.call_retry( 

324 f=self.nova_helper.get_compute_node_list) 

325 compute_nodes = set( 

326 [node.hypervisor_hostname for node in all_nodes]) 

327 LOG.debug("compute nodes: %s", compute_nodes) 

328 

329 node_futures = [self.executor.submit( 

330 self.nova_helper.get_compute_node_by_name, 

331 node, servers=True, detailed=True) 

332 for node in compute_nodes] 

333 LOG.debug("submitted %d jobs", len(compute_nodes)) 

334 

335 # Futures will concurrently be added, only safe with CPython GIL 

336 future_instances = [] 

337 self.executor.do_while_futures_modify( 

338 node_futures, self._compute_node_future, future_instances) 

339 

340 # Wait for all instance jobs to finish 

341 waiters.wait_for_all(future_instances) 

342 

343 def add_compute_node(self, node): 

344 # Build and add base node. 

345 LOG.debug("node info: %s", node) 

346 compute_node = self.build_compute_node(node) 

347 self.model.add_node(compute_node) 

348 

349 # NOTE(v-francoise): we can encapsulate capabilities of the node 

350 # (special instruction sets of CPUs) in the attributes; as well as 

351 # sub-nodes can be added re-presenting e.g. GPUs/Accelerators etc. 

352 

353 # # Build & add disk, memory, network and cpu nodes. 

354 # disk_id, disk_node = self.build_disk_compute_node(base_id, node) 

355 # self.add_node(disk_id, disk_node) 

356 # mem_id, mem_node = self.build_memory_compute_node(base_id, node) 

357 # self.add_node(mem_id, mem_node) 

358 # net_id, net_node = self._build_network_compute_node(base_id) 

359 # self.add_node(net_id, net_node) 

360 # cpu_id, cpu_node = self.build_cpu_compute_node(base_id, node) 

361 # self.add_node(cpu_id, cpu_node) 

362 

363 # # Connect the base compute node to the dependent nodes. 

364 # self.add_edges_from([(base_id, disk_id), (base_id, mem_id), 

365 # (base_id, cpu_id), (base_id, net_id)], 

366 # label="contains") 

367 

368 def build_compute_node(self, node): 

369 """Build a compute node from a Nova compute node 

370 

371 :param node: A node hypervisor instance 

372 :type node: :py:class:`~novaclient.v2.hypervisors.Hypervisor` 

373 """ 

374 inventories = self.placement_helper.get_inventories(node.id) 

375 if inventories and orc.VCPU in inventories: 

376 vcpus = inventories[orc.VCPU]['total'] 

377 vcpu_reserved = inventories[orc.VCPU]['reserved'] 

378 vcpu_ratio = inventories[orc.VCPU]['allocation_ratio'] 

379 else: 

380 vcpus = node.vcpus 

381 vcpu_reserved = 0 

382 vcpu_ratio = 1.0 

383 

384 if inventories and orc.MEMORY_MB in inventories: 

385 memory_mb = inventories[orc.MEMORY_MB]['total'] 

386 memory_mb_reserved = inventories[orc.MEMORY_MB]['reserved'] 

387 memory_ratio = inventories[orc.MEMORY_MB]['allocation_ratio'] 

388 else: 

389 memory_mb = node.memory_mb 

390 memory_mb_reserved = 0 

391 memory_ratio = 1.0 

392 

393 # NOTE(licanwei): A nova BP support-shared-storage-resource-provider 

394 # will move DISK_GB from compute node to shared storage RP. 

395 # Here may need to be updated when the nova BP released. 

396 if inventories and orc.DISK_GB in inventories: 

397 disk_capacity = inventories[orc.DISK_GB]['total'] 

398 disk_gb_reserved = inventories[orc.DISK_GB]['reserved'] 

399 disk_ratio = inventories[orc.DISK_GB]['allocation_ratio'] 

400 else: 

401 disk_capacity = node.local_gb 

402 disk_gb_reserved = 0 

403 disk_ratio = 1.0 

404 

405 # build up the compute node. 

406 node_attributes = { 

407 # The id of the hypervisor as a UUID from version 2.53. 

408 "uuid": node.id, 

409 "hostname": node.service["host"], 

410 "memory": memory_mb, 

411 "memory_ratio": memory_ratio, 

412 "memory_mb_reserved": memory_mb_reserved, 

413 "disk": disk_capacity, 

414 "disk_gb_reserved": disk_gb_reserved, 

415 "disk_ratio": disk_ratio, 

416 "vcpus": vcpus, 

417 "vcpu_reserved": vcpu_reserved, 

418 "vcpu_ratio": vcpu_ratio, 

419 "state": node.state, 

420 "status": node.status, 

421 "disabled_reason": node.service["disabled_reason"]} 

422 

423 compute_node = element.ComputeNode(**node_attributes) 

424 # compute_node = self._build_node("physical", "compute", "hypervisor", 

425 # node_attributes) 

426 return compute_node 

427 

428 def add_instance_node(self, node, instances): 

429 if instances is None: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true

430 LOG.info("no instances on compute_node: %s", node) 

431 return 

432 host = node.service["host"] 

433 compute_node = self.model.get_node_by_uuid(node.id) 

434 filters = {'host': host} 

435 limit = len(instances) if len(instances) <= 1000 else -1 

436 # Get all servers on this compute host. 

437 # Note that the advantage of passing the limit parameter is 

438 # that it can speed up the call time of novaclient. 1000 is 

439 # the default maximum number of return servers provided by 

440 # compute API. If we need to request more than 1000 servers, 

441 # we can set limit=-1. For details, please see: 

442 # https://bugs.launchpad.net/watcher/+bug/1834679 

443 instances = self.call_retry(f=self.nova_helper.get_instance_list, 

444 filters=filters, limit=limit) 

445 for inst in instances: 

446 # skip deleted instance 

447 if getattr(inst, "OS-EXT-STS:vm_state") == ( 

448 element.InstanceState.DELETED.value): 

449 continue 

450 # Add Node 

451 instance = self._build_instance_node(inst) 

452 self.model.add_instance(instance) 

453 # Connect the instance to its compute node 

454 self.model.map_instance(instance, compute_node) 

455 

456 def _build_instance_node(self, instance): 

457 """Build an instance node 

458 

459 Create an instance node for the graph using nova and the 

460 `server` nova object. 

461 :param instance: Nova VM object. 

462 :return: An instance node for the graph. 

463 """ 

464 flavor = instance.flavor 

465 instance_attributes = { 

466 "uuid": instance.id, 

467 "name": instance.name, 

468 "memory": flavor["ram"], 

469 "disk": flavor["disk"], 

470 "vcpus": flavor["vcpus"], 

471 "state": getattr(instance, "OS-EXT-STS:vm_state"), 

472 "metadata": instance.metadata, 

473 "project_id": instance.tenant_id, 

474 "locked": instance.locked} 

475 

476 # node_attributes = dict() 

477 # node_attributes["layer"] = "virtual" 

478 # node_attributes["category"] = "compute" 

479 # node_attributes["type"] = "compute" 

480 # node_attributes["attributes"] = instance_attributes 

481 return element.Instance(**instance_attributes) 

482 

483 def _merge_compute_scope(self, compute_scope): 

484 model_keys = self.model_scope.keys() 

485 update_flag = False 

486 

487 role_keys = ("host_aggregates", "availability_zones") 

488 for role in compute_scope: 

489 role_key = list(role.keys())[0] 

490 if role_key not in role_keys: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 continue 

492 role_values = list(role.values())[0] 

493 if role_key in model_keys: 

494 for value in role_values: 

495 if value not in self.model_scope[role_key]: 

496 self.model_scope[role_key].append(value) 

497 update_flag = True 

498 else: 

499 self.model_scope[role_key] = role_values 

500 update_flag = True 

501 return update_flag 

502 

503 def _check_model_scope(self, model_scope): 

504 compute_scope = [] 

505 update_flag = False 

506 for _scope in model_scope: 

507 if 'compute' in _scope: 507 ↛ 506line 507 didn't jump to line 506 because the condition on line 507 was always true

508 compute_scope = _scope['compute'] 

509 break 

510 

511 if self.no_model_scope_flag is False: 511 ↛ 518line 511 didn't jump to line 518 because the condition on line 511 was always true

512 if compute_scope: 

513 update_flag = self._merge_compute_scope(compute_scope) 

514 else: 

515 self.model_scope = dict() 

516 update_flag = True 

517 

518 return update_flag 

519 

520 def execute(self, model_scope): 

521 """Instantiates the graph with the openstack cluster data.""" 

522 

523 updata_model_flag = self._check_model_scope(model_scope) 

524 if self.model is None or updata_model_flag: 524 ↛ 528line 524 didn't jump to line 528 because the condition on line 524 was always true

525 self.model = self.model or model_root.ModelRoot() 

526 self._add_physical_layer() 

527 

528 return self.model