Coverage for watcher/decision_engine/model/collector/nova.py: 87%
184 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2017 Intel Innovation and Research Ireland Ltd.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16import os_resource_classes as orc
17from oslo_log import log
19from futurist import waiters
21from watcher.common import nova_helper
22from watcher.common import placement_helper
23from watcher.decision_engine.model.collector import base
24from watcher.decision_engine.model import element
25from watcher.decision_engine.model import model_root
26from watcher.decision_engine.model.notification import nova
27from watcher.decision_engine.scope import compute as compute_scope
28from watcher.decision_engine import threading
30LOG = log.getLogger(__name__)
33class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
34 """Nova cluster data model collector
36 The Nova cluster data model collector creates an in-memory
37 representation of the resources exposed by the compute service.
38 """
40 HOST_AGGREGATES = "#/items/properties/compute/host_aggregates/"
41 SCHEMA = {
42 "$schema": "http://json-schema.org/draft-04/schema#",
43 "type": "array",
44 "items": {
45 "type": "object",
46 "properties": {
47 "host_aggregates": {
48 "type": "array",
49 "items": {
50 "anyOf": [
51 {"$ref": HOST_AGGREGATES + "host_aggr_id"},
52 {"$ref": HOST_AGGREGATES + "name"},
53 ]
54 }
55 },
56 "availability_zones": {
57 "type": "array",
58 "items": {
59 "type": "object",
60 "properties": {
61 "name": {
62 "type": "string"
63 }
64 },
65 "additionalProperties": False
66 }
67 },
68 "exclude": {
69 "type": "array",
70 "items": {
71 "type": "object",
72 "properties": {
73 "instances": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "uuid": {
79 "type": "string"
80 }
81 },
82 "additionalProperties": False
83 }
84 },
85 "compute_nodes": {
86 "type": "array",
87 "items": {
88 "type": "object",
89 "properties": {
90 "name": {
91 "type": "string"
92 }
93 },
94 "additionalProperties": False
95 }
96 },
97 "host_aggregates": {
98 "type": "array",
99 "items": {
100 "anyOf": [
101 {"$ref":
102 HOST_AGGREGATES + "host_aggr_id"},
103 {"$ref": HOST_AGGREGATES + "name"},
104 ]
105 }
106 },
107 "instance_metadata": {
108 "type": "array",
109 "items": {
110 "type": "object"
111 }
112 },
113 "projects": {
114 "type": "array",
115 "items": {
116 "type": "object",
117 "properties": {
118 "uuid": {
119 "type": "string"
120 }
121 },
122 "additionalProperties": False
123 }
124 }
125 },
126 "additionalProperties": False
127 }
128 }
129 },
130 "additionalProperties": False
131 },
132 "host_aggregates": {
133 "host_aggr_id": {
134 "properties": {
135 "id": {
136 "oneOf": [
137 {"type": "integer"},
138 {"enum": ["*"]}
139 ]
140 }
141 },
142 "additionalProperties": False
143 },
144 "name": {
145 "properties": {
146 "name": {
147 "type": "string"
148 }
149 },
150 "additionalProperties": False
151 }
152 },
153 "additionalProperties": False
154 }
156 def __init__(self, config, osc=None):
157 super(NovaClusterDataModelCollector, self).__init__(config, osc)
159 @property
160 def notification_endpoints(self):
161 """Associated notification endpoints
163 :return: Associated notification endpoints
164 :rtype: List of :py:class:`~.EventsNotificationEndpoint` instances
165 """
166 return [
167 nova.VersionedNotification(self),
168 ]
170 def get_audit_scope_handler(self, audit_scope):
171 self._audit_scope_handler = compute_scope.ComputeScope(
172 audit_scope, self.config)
173 if self._data_model_scope is None or ( 173 ↛ 178line 173 didn't jump to line 178 because the condition on line 173 was always true
174 len(self._data_model_scope) > 0 and (
175 self._data_model_scope != audit_scope)):
176 self._data_model_scope = audit_scope
177 self._cluster_data_model = None
178 LOG.debug("audit scope %s", audit_scope)
179 return self._audit_scope_handler
181 def execute(self):
182 """Build the compute cluster data model"""
183 LOG.debug("Building latest Nova cluster data model")
185 if self._audit_scope_handler is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 LOG.debug("No audit, Don't Build compute data model")
187 return
188 if self._data_model_scope is None:
189 LOG.debug("No audit scope, Don't Build compute data model")
190 return
192 builder = NovaModelBuilder(self.osc)
193 return builder.execute(self._data_model_scope)
196class NovaModelBuilder(base.BaseModelBuilder):
197 """Build the graph-based model
199 This model builder adds the following data"
201 - Compute-related knowledge (Nova)
202 - TODO(v-francoise): Network-related knowledge (Neutron)
204 NOTE(v-francoise): This model builder is meant to be extended in the future
205 to also include both storage and network information respectively coming
206 from Cinder and Neutron. Some prelimary work has been done in this
207 direction in https://review.opendev.org/#/c/362730 but since we cannot
208 guarantee a sufficient level of consistency for neither the storage nor the
209 network part before the end of the Ocata cycle, this work has been
210 re-scheduled for Pike. In the meantime, all the associated code has been
211 commented out.
212 """
214 def __init__(self, osc):
215 self.osc = osc
216 self.model = None
217 self.model_scope = dict()
218 self.no_model_scope_flag = False
219 self.nova = osc.nova()
220 self.nova_helper = nova_helper.NovaHelper(osc=self.osc)
221 self.placement_helper = placement_helper.PlacementHelper(osc=self.osc)
222 self.executor = threading.DecisionEngineThreadPool()
224 def _collect_aggregates(self, host_aggregates, _nodes):
225 if not host_aggregates:
226 return
228 aggregate_list = self.call_retry(f=self.nova_helper.get_aggregate_list)
229 aggregate_ids = [aggregate['id'] for aggregate
230 in host_aggregates if 'id' in aggregate]
231 aggregate_names = [aggregate['name'] for aggregate
232 in host_aggregates if 'name' in aggregate]
233 include_all_nodes = any('*' in field
234 for field in (aggregate_ids, aggregate_names))
236 for aggregate in aggregate_list:
237 if (aggregate.id in aggregate_ids or
238 aggregate.name in aggregate_names or
239 include_all_nodes):
240 _nodes.update(aggregate.hosts)
242 def _collect_zones(self, availability_zones, _nodes):
243 if not availability_zones:
244 return
246 service_list = self.call_retry(f=self.nova_helper.get_service_list)
247 zone_names = [zone['name'] for zone
248 in availability_zones]
249 include_all_nodes = False
250 if '*' in zone_names: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 include_all_nodes = True
252 for service in service_list:
253 if service.zone in zone_names or include_all_nodes:
254 _nodes.add(service.host)
256 def _compute_node_future(self, future, future_instances):
257 """Add compute node information to model and schedule instance info job
259 :param future: The future from the finished execution
260 :rtype future: :py:class:`futurist.GreenFuture`
261 :param future_instances: list of futures for instance jobs
262 :rtype future_instances: list :py:class:`futurist.GreenFuture`
263 """
264 try:
265 node_info = future.result()[0]
267 # filter out baremetal node
268 if node_info.hypervisor_type == 'ironic':
269 LOG.debug("filtering out baremetal node: %s", node_info)
270 return
271 self.add_compute_node(node_info)
272 # node.servers is a list of server objects
273 # New in nova version 2.53
274 instances = getattr(node_info, "servers", None)
275 # Do not submit job if there are no instances on compute node
276 if instances is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 LOG.info("No instances on compute_node: %s", node_info)
278 return
279 future_instances.append(
280 self.executor.submit(
281 self.add_instance_node, node_info, instances)
282 )
283 except Exception:
284 LOG.error("compute node from aggregate / "
285 "availability_zone could not be found")
287 def _add_physical_layer(self):
288 """Collects all information on compute nodes and instances
290 Will collect all required compute node and instance information based
291 on the host aggregates and availability zones. If aggregates and zones
292 do not specify any compute nodes all nodes are retrieved instead.
294 The collection of information happens concurrently using the
295 DecisionEngineThreadpool. The collection is parallelized in three steps
296 first information about aggregates and zones is gathered. Secondly,
297 for each of the compute nodes a tasks is submitted to get detailed
298 information about the compute node. Finally, Each of these submitted
299 tasks will submit an additional task if the compute node contains
300 instances. Before returning from this function all instance tasks are
301 waited upon to complete.
302 """
304 compute_nodes = set()
305 host_aggregates = self.model_scope.get("host_aggregates")
306 availability_zones = self.model_scope.get("availability_zones")
308 """Submit tasks to gather compute nodes from availability zones and
309 host aggregates. Each task adds compute nodes to the set, this set is
310 threadsafe under the assumption that CPython is used with the GIL
311 enabled."""
312 zone_aggregate_futures = {
313 self.executor.submit(
314 self._collect_aggregates, host_aggregates, compute_nodes),
315 self.executor.submit(
316 self._collect_zones, availability_zones, compute_nodes)
317 }
318 waiters.wait_for_all(zone_aggregate_futures)
320 # if zones and aggregates did not contain any nodes get every node.
321 if not compute_nodes:
322 self.no_model_scope_flag = True
323 all_nodes = self.call_retry(
324 f=self.nova_helper.get_compute_node_list)
325 compute_nodes = set(
326 [node.hypervisor_hostname for node in all_nodes])
327 LOG.debug("compute nodes: %s", compute_nodes)
329 node_futures = [self.executor.submit(
330 self.nova_helper.get_compute_node_by_name,
331 node, servers=True, detailed=True)
332 for node in compute_nodes]
333 LOG.debug("submitted %d jobs", len(compute_nodes))
335 # Futures will concurrently be added, only safe with CPython GIL
336 future_instances = []
337 self.executor.do_while_futures_modify(
338 node_futures, self._compute_node_future, future_instances)
340 # Wait for all instance jobs to finish
341 waiters.wait_for_all(future_instances)
343 def add_compute_node(self, node):
344 # Build and add base node.
345 LOG.debug("node info: %s", node)
346 compute_node = self.build_compute_node(node)
347 self.model.add_node(compute_node)
349 # NOTE(v-francoise): we can encapsulate capabilities of the node
350 # (special instruction sets of CPUs) in the attributes; as well as
351 # sub-nodes can be added re-presenting e.g. GPUs/Accelerators etc.
353 # # Build & add disk, memory, network and cpu nodes.
354 # disk_id, disk_node = self.build_disk_compute_node(base_id, node)
355 # self.add_node(disk_id, disk_node)
356 # mem_id, mem_node = self.build_memory_compute_node(base_id, node)
357 # self.add_node(mem_id, mem_node)
358 # net_id, net_node = self._build_network_compute_node(base_id)
359 # self.add_node(net_id, net_node)
360 # cpu_id, cpu_node = self.build_cpu_compute_node(base_id, node)
361 # self.add_node(cpu_id, cpu_node)
363 # # Connect the base compute node to the dependent nodes.
364 # self.add_edges_from([(base_id, disk_id), (base_id, mem_id),
365 # (base_id, cpu_id), (base_id, net_id)],
366 # label="contains")
368 def build_compute_node(self, node):
369 """Build a compute node from a Nova compute node
371 :param node: A node hypervisor instance
372 :type node: :py:class:`~novaclient.v2.hypervisors.Hypervisor`
373 """
374 inventories = self.placement_helper.get_inventories(node.id)
375 if inventories and orc.VCPU in inventories:
376 vcpus = inventories[orc.VCPU]['total']
377 vcpu_reserved = inventories[orc.VCPU]['reserved']
378 vcpu_ratio = inventories[orc.VCPU]['allocation_ratio']
379 else:
380 vcpus = node.vcpus
381 vcpu_reserved = 0
382 vcpu_ratio = 1.0
384 if inventories and orc.MEMORY_MB in inventories:
385 memory_mb = inventories[orc.MEMORY_MB]['total']
386 memory_mb_reserved = inventories[orc.MEMORY_MB]['reserved']
387 memory_ratio = inventories[orc.MEMORY_MB]['allocation_ratio']
388 else:
389 memory_mb = node.memory_mb
390 memory_mb_reserved = 0
391 memory_ratio = 1.0
393 # NOTE(licanwei): A nova BP support-shared-storage-resource-provider
394 # will move DISK_GB from compute node to shared storage RP.
395 # Here may need to be updated when the nova BP released.
396 if inventories and orc.DISK_GB in inventories:
397 disk_capacity = inventories[orc.DISK_GB]['total']
398 disk_gb_reserved = inventories[orc.DISK_GB]['reserved']
399 disk_ratio = inventories[orc.DISK_GB]['allocation_ratio']
400 else:
401 disk_capacity = node.local_gb
402 disk_gb_reserved = 0
403 disk_ratio = 1.0
405 # build up the compute node.
406 node_attributes = {
407 # The id of the hypervisor as a UUID from version 2.53.
408 "uuid": node.id,
409 "hostname": node.service["host"],
410 "memory": memory_mb,
411 "memory_ratio": memory_ratio,
412 "memory_mb_reserved": memory_mb_reserved,
413 "disk": disk_capacity,
414 "disk_gb_reserved": disk_gb_reserved,
415 "disk_ratio": disk_ratio,
416 "vcpus": vcpus,
417 "vcpu_reserved": vcpu_reserved,
418 "vcpu_ratio": vcpu_ratio,
419 "state": node.state,
420 "status": node.status,
421 "disabled_reason": node.service["disabled_reason"]}
423 compute_node = element.ComputeNode(**node_attributes)
424 # compute_node = self._build_node("physical", "compute", "hypervisor",
425 # node_attributes)
426 return compute_node
428 def add_instance_node(self, node, instances):
429 if instances is None: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 LOG.info("no instances on compute_node: %s", node)
431 return
432 host = node.service["host"]
433 compute_node = self.model.get_node_by_uuid(node.id)
434 filters = {'host': host}
435 limit = len(instances) if len(instances) <= 1000 else -1
436 # Get all servers on this compute host.
437 # Note that the advantage of passing the limit parameter is
438 # that it can speed up the call time of novaclient. 1000 is
439 # the default maximum number of return servers provided by
440 # compute API. If we need to request more than 1000 servers,
441 # we can set limit=-1. For details, please see:
442 # https://bugs.launchpad.net/watcher/+bug/1834679
443 instances = self.call_retry(f=self.nova_helper.get_instance_list,
444 filters=filters, limit=limit)
445 for inst in instances:
446 # skip deleted instance
447 if getattr(inst, "OS-EXT-STS:vm_state") == (
448 element.InstanceState.DELETED.value):
449 continue
450 # Add Node
451 instance = self._build_instance_node(inst)
452 self.model.add_instance(instance)
453 # Connect the instance to its compute node
454 self.model.map_instance(instance, compute_node)
456 def _build_instance_node(self, instance):
457 """Build an instance node
459 Create an instance node for the graph using nova and the
460 `server` nova object.
461 :param instance: Nova VM object.
462 :return: An instance node for the graph.
463 """
464 flavor = instance.flavor
465 instance_attributes = {
466 "uuid": instance.id,
467 "name": instance.name,
468 "memory": flavor["ram"],
469 "disk": flavor["disk"],
470 "vcpus": flavor["vcpus"],
471 "state": getattr(instance, "OS-EXT-STS:vm_state"),
472 "metadata": instance.metadata,
473 "project_id": instance.tenant_id,
474 "locked": instance.locked}
476 # node_attributes = dict()
477 # node_attributes["layer"] = "virtual"
478 # node_attributes["category"] = "compute"
479 # node_attributes["type"] = "compute"
480 # node_attributes["attributes"] = instance_attributes
481 return element.Instance(**instance_attributes)
483 def _merge_compute_scope(self, compute_scope):
484 model_keys = self.model_scope.keys()
485 update_flag = False
487 role_keys = ("host_aggregates", "availability_zones")
488 for role in compute_scope:
489 role_key = list(role.keys())[0]
490 if role_key not in role_keys: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true
491 continue
492 role_values = list(role.values())[0]
493 if role_key in model_keys:
494 for value in role_values:
495 if value not in self.model_scope[role_key]:
496 self.model_scope[role_key].append(value)
497 update_flag = True
498 else:
499 self.model_scope[role_key] = role_values
500 update_flag = True
501 return update_flag
503 def _check_model_scope(self, model_scope):
504 compute_scope = []
505 update_flag = False
506 for _scope in model_scope:
507 if 'compute' in _scope: 507 ↛ 506line 507 didn't jump to line 506 because the condition on line 507 was always true
508 compute_scope = _scope['compute']
509 break
511 if self.no_model_scope_flag is False: 511 ↛ 518line 511 didn't jump to line 518 because the condition on line 511 was always true
512 if compute_scope:
513 update_flag = self._merge_compute_scope(compute_scope)
514 else:
515 self.model_scope = dict()
516 update_flag = True
518 return update_flag
520 def execute(self, model_scope):
521 """Instantiates the graph with the openstack cluster data."""
523 updata_model_flag = self._check_model_scope(model_scope)
524 if self.model is None or updata_model_flag: 524 ↛ 528line 524 didn't jump to line 528 because the condition on line 524 was always true
525 self.model = self.model or model_root.ModelRoot()
526 self._add_physical_layer()
528 return self.model