Coverage for watcher/decision_engine/model/notification/nova.py: 84%
202 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 b<>com
3#
4# Authors: Vincent FRANCOISE <Vincent.FRANCOISE@b-com.com>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
19import os_resource_classes as orc
20from oslo_log import log
21from watcher.common import exception
22from watcher.common import nova_helper
23from watcher.common import placement_helper
24from watcher.common import utils
25from watcher.decision_engine.model import element
26from watcher.decision_engine.model.notification import base
27from watcher.decision_engine.model.notification import filtering
29LOG = log.getLogger(__name__)
32class NovaNotification(base.NotificationEndpoint):
34 def __init__(self, collector):
35 super(NovaNotification, self).__init__(collector)
36 self._nova = None
37 self._placement_helper = None
39 @property
40 def nova(self):
41 if self._nova is None:
42 self._nova = nova_helper.NovaHelper()
43 return self._nova
45 @property
46 def placement_helper(self):
47 if self._placement_helper is None: 47 ↛ 49line 47 didn't jump to line 49 because the condition on line 47 was always true
48 self._placement_helper = placement_helper.PlacementHelper()
49 return self._placement_helper
51 def get_or_create_instance(self, instance_uuid, node_name=None):
52 try:
53 node = None
54 if node_name: 54 ↛ 60line 54 didn't jump to line 60 because the condition on line 54 was always true
55 node = self.get_or_create_node(node_name)
56 except exception.ComputeNodeNotFound:
57 LOG.warning("Could not find compute node %(node)s for "
58 "instance %(instance)s",
59 dict(node=node_name, instance=instance_uuid))
60 try:
61 instance = self.cluster_data_model.get_instance_by_uuid(
62 instance_uuid)
63 except exception.InstanceNotFound:
64 # The instance didn't exist yet so we create a new instance object
65 LOG.debug("New instance created: %s", instance_uuid)
66 instance = element.Instance(uuid=instance_uuid)
68 self.cluster_data_model.add_instance(instance)
69 if node:
70 self.cluster_data_model.map_instance(instance, node)
72 return instance
74 def update_instance(self, instance, data):
75 n_version = float(data['nova_object.version'])
76 instance_data = data['nova_object.data']
77 instance_flavor_data = instance_data['flavor']['nova_object.data']
79 memory_mb = instance_flavor_data['memory_mb']
80 num_cores = instance_flavor_data['vcpus']
81 disk_gb = instance_flavor_data['root_gb']
82 instance_metadata = data['nova_object.data']['metadata']
84 instance.update({
85 'state': instance_data['state'],
86 'hostname': instance_data['host_name'],
87 # this is the user-provided display name of the server which is not
88 # guaranteed to be unique nor is it immutable.
89 'name': instance_data['display_name'],
90 'memory': memory_mb,
91 'vcpus': num_cores,
92 'disk': disk_gb,
93 'metadata': instance_metadata,
94 'project_id': instance_data['tenant_id']
95 })
96 # locked was added in nova notification payload version 1.1
97 if n_version > 1.0:
98 instance.update({'locked': instance_data['locked']})
100 try:
101 node = self.get_or_create_node(instance_data['host'])
102 except exception.ComputeNodeNotFound as exc:
103 LOG.exception(exc)
104 # If we can't create the node, we consider the instance as unmapped
105 node = None
107 self.update_instance_mapping(instance, node)
109 def update_compute_node(self, node, data):
110 """Update the compute node using the notification data."""
111 node_data = data['nova_object.data']
112 node_state = (
113 element.ServiceState.OFFLINE.value
114 if node_data['forced_down'] else element.ServiceState.ONLINE.value)
115 node_status = (
116 element.ServiceState.DISABLED.value
117 if node_data['disabled'] else element.ServiceState.ENABLED.value)
118 disabled_reason = (
119 node_data['disabled_reason']
120 if node_data['disabled'] else None)
122 node.update({
123 'hostname': node_data['host'],
124 'state': node_state,
125 'status': node_status,
126 'disabled_reason': disabled_reason,
127 })
129 def create_compute_node(self, uuid_or_name):
130 """Create the computeNode node."""
131 try:
132 if utils.is_uuid_like(uuid_or_name): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 _node = self.nova.get_compute_node_by_uuid(uuid_or_name)
134 else:
135 _node = self.nova.get_compute_node_by_hostname(uuid_or_name)
136 inventories = self.placement_helper.get_inventories(_node.id)
137 if inventories and orc.VCPU in inventories: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 vcpus = inventories[orc.VCPU]['total']
139 vcpu_reserved = inventories[orc.VCPU]['reserved']
140 vcpu_ratio = inventories[orc.VCPU]['allocation_ratio']
141 else:
142 vcpus = _node.vcpus
143 vcpu_reserved = 0
144 vcpu_ratio = 1.0
146 if inventories and orc.MEMORY_MB in inventories: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 memory_mb = inventories[orc.MEMORY_MB]['total']
148 memory_mb_reserved = inventories[orc.MEMORY_MB]['reserved']
149 memory_ratio = inventories[orc.MEMORY_MB]['allocation_ratio']
150 else:
151 memory_mb = _node.memory_mb
152 memory_mb_reserved = 0
153 memory_ratio = 1.0
155 # NOTE(licanwei): A BP support-shared-storage-resource-provider
156 # will move DISK_GB from compute node to shared storage RP.
157 # Here may need to be updated when the nova BP released.
158 if inventories and orc.DISK_GB in inventories: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 disk_capacity = inventories[orc.DISK_GB]['total']
160 disk_gb_reserved = inventories[orc.DISK_GB]['reserved']
161 disk_ratio = inventories[orc.DISK_GB]['allocation_ratio']
162 else:
163 disk_capacity = _node.local_gb
164 disk_gb_reserved = 0
165 disk_ratio = 1.0
167 # build up the compute node.
168 node_attributes = {
169 # The id of the hypervisor as a UUID from version 2.53.
170 "uuid": _node.id,
171 "hostname": _node.service["host"],
172 "memory": memory_mb,
173 "memory_ratio": memory_ratio,
174 "memory_mb_reserved": memory_mb_reserved,
175 "disk": disk_capacity,
176 "disk_gb_reserved": disk_gb_reserved,
177 "disk_ratio": disk_ratio,
178 "vcpus": vcpus,
179 "vcpu_reserved": vcpu_reserved,
180 "vcpu_ratio": vcpu_ratio,
181 "state": _node.state,
182 "status": _node.status,
183 "disabled_reason": _node.service["disabled_reason"]}
185 node = element.ComputeNode(**node_attributes)
186 self.cluster_data_model.add_node(node)
187 LOG.debug("New compute node mapped: %s", node.uuid)
188 return node
189 except Exception as exc:
190 LOG.exception(exc)
191 LOG.debug("Could not refresh the node %s.", uuid_or_name)
192 raise exception.ComputeNodeNotFound(name=uuid_or_name)
194 def get_or_create_node(self, uuid_or_name):
195 if uuid_or_name is None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 LOG.debug("Compute node UUID or name not provided: skipping")
197 return
198 try:
199 if utils.is_uuid_like(uuid_or_name): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 return self.cluster_data_model.get_node_by_uuid(uuid_or_name)
201 else:
202 return self.cluster_data_model.get_node_by_name(uuid_or_name)
203 except exception.ComputeNodeNotFound:
204 # The node didn't exist yet so we create a new node object
205 node = self.create_compute_node(uuid_or_name)
206 LOG.debug("New compute node created: %s", uuid_or_name)
207 return node
209 def update_instance_mapping(self, instance, node):
210 if node is None:
211 self.cluster_data_model.add_instance(instance)
212 LOG.debug("Instance %s not yet attached to any node: skipping",
213 instance.uuid)
214 return
215 try:
216 try:
217 current_node = (
218 self.cluster_data_model.get_node_by_instance_uuid(
219 instance.uuid))
220 except exception.ComputeResourceNotFound as exc:
221 LOG.exception(exc)
222 # If we can't create the node,
223 # we consider the instance as unmapped
224 current_node = None
226 LOG.debug("Mapped node %s found", node.uuid)
227 if current_node and node != current_node:
228 LOG.debug("Unmapping instance %s from %s",
229 instance.uuid, node.uuid)
230 self.cluster_data_model.unmap_instance(instance, current_node)
231 except exception.InstanceNotFound:
232 # The instance didn't exist yet so we map it for the first time
233 LOG.debug("New instance: mapping it to %s", node.uuid)
234 finally:
235 if node: 235 ↛ exitline 235 didn't return from function 'update_instance_mapping' because the condition on line 235 was always true
236 self.cluster_data_model.map_instance(instance, node)
237 LOG.debug("Mapped instance %s to %s", instance.uuid, node.uuid)
239 def delete_instance(self, instance, node):
240 try:
241 self.cluster_data_model.delete_instance(instance, node)
242 except Exception:
243 LOG.info("Instance %s already deleted", instance.uuid)
245 def delete_node(self, node):
246 try:
247 self.cluster_data_model.remove_node(node)
248 except Exception:
249 LOG.info("Node %s already deleted", node.uuid)
252class VersionedNotification(NovaNotification):
253 publisher_id_regex = r'^nova-.*'
255 def service_updated(self, payload):
256 node_data = payload['nova_object.data']
257 node_name = node_data['host']
258 try:
259 node = self.get_or_create_node(node_name)
260 self.update_compute_node(node, payload)
261 except exception.ComputeNodeNotFound as exc:
262 LOG.exception(exc)
264 def service_deleted(self, payload):
265 node_data = payload['nova_object.data']
266 node_name = node_data['host']
267 try:
268 node = self.get_or_create_node(node_name)
269 self.delete_node(node)
270 except exception.ComputeNodeNotFound as exc:
271 LOG.exception(exc)
273 def instance_updated(self, payload):
274 instance_data = payload['nova_object.data']
275 instance_uuid = instance_data['uuid']
276 instance_state = instance_data['state']
277 node_name = instance_data.get('host')
278 # if instance state is building, don't update data model
279 if instance_state == 'building':
280 return
282 instance = self.get_or_create_instance(instance_uuid, node_name)
284 self.update_instance(instance, payload)
286 def instance_created(self, payload):
287 instance_data = payload['nova_object.data']
288 instance_uuid = instance_data['uuid']
289 instance = element.Instance(uuid=instance_uuid)
290 self.cluster_data_model.add_instance(instance)
292 node_name = instance_data.get('host')
293 if node_name: 293 ↛ 297line 293 didn't jump to line 297 because the condition on line 293 was always true
294 node = self.get_or_create_node(node_name)
295 self.cluster_data_model.map_instance(instance, node)
297 self.update_instance(instance, payload)
299 def instance_deleted(self, payload):
300 instance_data = payload['nova_object.data']
301 instance_uuid = instance_data['uuid']
302 node_name = instance_data.get('host')
303 instance = self.get_or_create_instance(instance_uuid, node_name)
305 try:
306 node = self.get_or_create_node(instance_data['host'])
307 except exception.ComputeNodeNotFound as exc:
308 LOG.exception(exc)
309 # If we can't create the node, we consider the instance as unmapped
310 node = None
312 self.delete_instance(instance, node)
314 notification_mapping = {
315 'instance.create.end': instance_created,
316 'instance.lock': instance_updated,
317 'instance.unlock': instance_updated,
318 'instance.pause.end': instance_updated,
319 'instance.power_off.end': instance_updated,
320 'instance.power_on.end': instance_updated,
321 'instance.resize_confirm.end': instance_updated,
322 'instance.restore.end': instance_updated,
323 'instance.resume.end': instance_updated,
324 'instance.shelve.end': instance_updated,
325 'instance.shutdown.end': instance_updated,
326 'instance.suspend.end': instance_updated,
327 'instance.unpause.end': instance_updated,
328 'instance.unrescue.end': instance_updated,
329 'instance.unshelve.end': instance_updated,
330 'instance.rebuild.end': instance_updated,
331 'instance.rescue.end': instance_updated,
332 'instance.update': instance_updated,
333 'instance.live_migration_force_complete.end': instance_updated,
334 'instance.live_migration_post.end': instance_updated,
335 'instance.delete.end': instance_deleted,
336 'instance.soft_delete.end': instance_deleted,
337 'service.create': service_updated,
338 'service.delete': service_deleted,
339 'service.update': service_updated,
340 }
342 @property
343 def filter_rule(self):
344 """Nova notification filter"""
345 return filtering.NotificationFilter(
346 publisher_id=self.publisher_id_regex,
347 )
349 def info(self, ctxt, publisher_id, event_type, payload, metadata):
350 LOG.info("Event '%(event)s' received from %(publisher)s "
351 "with metadata %(metadata)s",
352 dict(event=event_type,
353 publisher=publisher_id,
354 metadata=metadata))
355 func = self.notification_mapping.get(event_type)
356 if func: 356 ↛ exitline 356 didn't return from function 'info' because the condition on line 356 was always true
357 # The nova CDM is not built until an audit is performed.
358 if self.cluster_data_model:
359 LOG.debug(payload)
360 func(self, payload)
361 else:
362 LOG.debug('Nova CDM has not yet been built; ignoring '
363 'notifications until an audit is performed.')