Coverage for watcher/decision_engine/model/notification/cinder.py: 82%
209 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright 2017 NEC Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from oslo_log import log
18from watcher.common import cinder_helper
19from watcher.common import exception
20from watcher.decision_engine.model import element
21from watcher.decision_engine.model.notification import base
22from watcher.decision_engine.model.notification import filtering
24LOG = log.getLogger(__name__)
27class CinderNotification(base.NotificationEndpoint):
29 def __init__(self, collector):
30 super(CinderNotification, self).__init__(collector)
31 self._cinder = None
33 @property
34 def cinder(self):
35 if self._cinder is None:
36 self._cinder = cinder_helper.CinderHelper()
37 return self._cinder
39 def update_pool(self, pool, data):
40 """Update the storage pool using the notification data."""
41 pool.update({
42 "total_capacity_gb": data['total'],
43 "free_capacity_gb": data['free'],
44 "provisioned_capacity_gb": data['provisioned'],
45 "allocated_capacity_gb": data['allocated'],
46 "virtual_free": data['virtual_free']
47 })
49 node_name = pool.name.split("#")[0]
50 node = self.get_or_create_node(node_name)
51 self.cluster_data_model.map_pool(pool, node)
52 LOG.debug("Mapped pool %s to %s", pool.name, node.host)
54 def update_pool_by_api(self, pool):
55 """Update the storage pool using the API data."""
56 if not pool:
57 return
58 _pool = self.cinder.get_storage_pool_by_name(pool.name)
59 pool.update({
60 "total_volumes": _pool.total_volumes,
61 "total_capacity_gb": _pool.total_capacity_gb,
62 "free_capacity_gb": _pool.free_capacity_gb,
63 "provisioned_capacity_gb": _pool.provisioned_capacity_gb,
64 "allocated_capacity_gb": _pool.allocated_capacity_gb
65 })
66 node_name = pool.name.split("#")[0]
67 node = self.get_or_create_node(node_name)
68 self.cluster_data_model.map_pool(pool, node)
69 LOG.debug("Mapped pool %s to %s", pool.name, node.host)
71 def create_storage_node(self, name):
72 """Create the storage node by querying the Cinder API."""
73 try:
74 _node = self.cinder.get_storage_node_by_name(name)
75 _volume_type = self.cinder.get_volume_type_by_backendname(
76 # name is formatted as host@backendname
77 name.split('@')[1])
78 storage_node = element.StorageNode(
79 host=_node.host,
80 zone=_node.zone,
81 state=_node.state,
82 status=_node.status,
83 volume_type=_volume_type)
84 return storage_node
85 except Exception as exc:
86 LOG.exception(exc)
87 LOG.debug("Could not create storage node %s.", name)
88 raise exception.StorageNodeNotFound(name=name)
90 def get_or_create_node(self, name):
91 """Get storage node by name, otherwise create storage node"""
92 if name is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 LOG.debug("Storage node name not provided: skipping")
94 return
95 try:
96 return self.cluster_data_model.get_node_by_name(name)
97 except exception.StorageNodeNotFound:
98 # The node didn't exist yet so we create a new node object
99 node = self.create_storage_node(name)
100 LOG.debug("New storage node created: %s", name)
101 self.cluster_data_model.add_node(node)
102 LOG.debug("New storage node added: %s", name)
103 return node
105 def create_pool(self, pool_name):
106 """Create the storage pool by querying the Cinder API."""
107 try:
108 _pool = self.cinder.get_storage_pool_by_name(pool_name)
109 pool = element.Pool(
110 name=_pool.name,
111 total_volumes=_pool.total_volumes,
112 total_capacity_gb=_pool.total_capacity_gb,
113 free_capacity_gb=_pool.free_capacity_gb,
114 provisioned_capacity_gb=_pool.provisioned_capacity_gb,
115 allocated_capacity_gb=_pool.allocated_capacity_gb)
116 return pool
117 except Exception as exc:
118 LOG.exception(exc)
119 LOG.debug("Could not refresh the pool %s.", pool_name)
120 raise exception.PoolNotFound(name=pool_name)
122 def get_or_create_pool(self, name):
123 if not name:
124 LOG.debug("Pool name not provided: skipping")
125 return
126 try:
127 return self.cluster_data_model.get_pool_by_pool_name(name)
128 except exception.PoolNotFound:
129 # The pool didn't exist yet so we create a new pool object
130 pool = self.create_pool(name)
131 LOG.debug("New storage pool created: %s", name)
132 self.cluster_data_model.add_pool(pool)
133 LOG.debug("New storage pool added: %s", name)
134 return pool
136 def get_or_create_volume(self, volume_id, pool_name=None):
137 try:
138 if pool_name:
139 self.get_or_create_pool(pool_name)
140 except exception.PoolNotFound:
141 LOG.warning("Could not find storage pool %(pool)s for "
142 "volume %(volume)s",
143 dict(pool=pool_name, volume=volume_id))
144 try:
145 return self.cluster_data_model.get_volume_by_uuid(volume_id)
146 except exception.VolumeNotFound:
147 # The volume didn't exist yet so we create a new volume object
148 volume = element.Volume(uuid=volume_id)
149 self.cluster_data_model.add_volume(volume)
150 return volume
152 def update_volume(self, volume, data):
153 """Update the volume using the notification data."""
155 def _keyReplace(key):
156 if key == 'instance_uuid':
157 return 'server_id'
158 if key == 'id':
159 return 'attachment_id'
161 attachments = [
162 {_keyReplace(k): v for k, v in iter(d.items())
163 if k in ('instance_uuid', 'id')}
164 for d in data['volume_attachment']
165 ]
167 # glance_metadata is provided if volume is bootable
168 bootable = False
169 if 'glance_metadata' in data:
170 bootable = True
172 volume.update({
173 "name": data['display_name'] or "",
174 "size": data['size'],
175 "status": data['status'],
176 "attachments": attachments,
177 "snapshot_id": data['snapshot_id'] or "",
178 "project_id": data['tenant_id'],
179 "metadata": data['metadata'],
180 "bootable": bootable
181 })
183 try:
184 # if volume is under pool, let's update pool element.
185 # get existing pool or create pool by cinder api
186 pool = self.get_or_create_pool(data['host'])
187 self.update_pool_by_api(pool)
189 except exception.PoolNotFound as exc:
190 LOG.exception(exc)
191 pool = None
193 self.update_volume_mapping(volume, pool)
195 def update_volume_mapping(self, volume, pool):
196 if pool is None:
197 self.cluster_data_model.add_volume(volume)
198 LOG.debug("Volume %s not yet attached to any pool: skipping",
199 volume.uuid)
200 return
201 try:
202 try:
203 current_pool = (
204 self.cluster_data_model.get_pool_by_volume(
205 volume) or self.get_or_create_pool(pool.name))
206 except exception.PoolNotFound as exc:
207 LOG.exception(exc)
208 # If we can't create the pool,
209 # we consider the volume as unmapped
210 current_pool = None
212 LOG.debug("Mapped pool %s found", pool.name)
213 if current_pool and pool != current_pool: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 LOG.debug("Unmapping volume %s from %s",
215 volume.uuid, pool.name)
216 self.cluster_data_model.unmap_volume(volume, current_pool)
217 except exception.VolumeNotFound:
218 # The instance didn't exist yet so we map it for the first time
219 LOG.debug("New volume: mapping it to %s", pool.name)
220 finally:
221 if pool: 221 ↛ exitline 221 didn't return from function 'update_volume_mapping' because the condition on line 221 was always true
222 self.cluster_data_model.map_volume(volume, pool)
223 LOG.debug("Mapped volume %s to %s", volume.uuid, pool.name)
225 def delete_volume(self, volume, pool):
226 try:
227 self.cluster_data_model.delete_volume(volume)
228 except Exception:
229 LOG.info("Volume %s already deleted", volume.uuid)
231 try:
232 if pool: 232 ↛ exitline 232 didn't return from function 'delete_volume' because the condition on line 232 was always true
233 # if volume is under pool, let's update pool element.
234 # get existing pool or create pool by cinder api
235 pool = self.get_or_create_pool(pool.name)
236 self.update_pool_by_api(pool)
237 except exception.PoolNotFound as exc:
238 LOG.exception(exc)
239 pool = None
242class CapacityNotificationEndpoint(CinderNotification):
244 @property
245 def filter_rule(self):
246 """Cinder capacity notification filter"""
247 return filtering.NotificationFilter(
248 publisher_id=r'capacity.*',
249 event_type='capacity.pool',
250 )
252 def info(self, ctxt, publisher_id, event_type, payload, metadata):
253 ctxt.request_id = metadata['message_id']
254 ctxt.project_domain = event_type
255 LOG.info("Event '%(event)s' received from %(publisher)s "
256 "with metadata %(metadata)s",
257 dict(event=event_type,
258 publisher=publisher_id,
259 metadata=metadata))
260 LOG.debug(payload)
261 name = payload['name_to_id']
262 try:
263 pool = self.get_or_create_pool(name)
264 self.update_pool(pool, payload)
265 except exception.PoolNotFound as exc:
266 LOG.exception(exc)
269class VolumeNotificationEndpoint(CinderNotification):
270 publisher_id_regex = r'^volume.*'
273class VolumeCreateEnd(VolumeNotificationEndpoint):
275 @property
276 def filter_rule(self):
277 """Cinder volume notification filter"""
278 return filtering.NotificationFilter(
279 publisher_id=self.publisher_id_regex,
280 event_type='volume.create.end',
281 )
283 def info(self, ctxt, publisher_id, event_type, payload, metadata):
284 ctxt.request_id = metadata['message_id']
285 ctxt.project_domain = event_type
286 LOG.info("Event '%(event)s' received from %(publisher)s "
287 "with metadata %(metadata)s",
288 dict(event=event_type,
289 publisher=publisher_id,
290 metadata=metadata))
291 LOG.debug(payload)
292 volume_id = payload['volume_id']
293 poolname = payload['host']
294 volume = self.get_or_create_volume(volume_id, poolname)
295 self.update_volume(volume, payload)
298class VolumeUpdateEnd(VolumeNotificationEndpoint):
300 @property
301 def filter_rule(self):
302 """Cinder volume notification filter"""
303 return filtering.NotificationFilter(
304 publisher_id=self.publisher_id_regex,
305 event_type='volume.update.end',
306 )
308 def info(self, ctxt, publisher_id, event_type, payload, metadata):
309 ctxt.request_id = metadata['message_id']
310 ctxt.project_domain = event_type
311 LOG.info("Event '%(event)s' received from %(publisher)s "
312 "with metadata %(metadata)s",
313 dict(event=event_type,
314 publisher=publisher_id,
315 metadata=metadata))
316 LOG.debug(payload)
317 volume_id = payload['volume_id']
318 poolname = payload['host']
319 volume = self.get_or_create_volume(volume_id, poolname)
320 self.update_volume(volume, payload)
323class VolumeAttachEnd(VolumeUpdateEnd):
325 @property
326 def filter_rule(self):
327 """Cinder volume notification filter"""
328 return filtering.NotificationFilter(
329 publisher_id=self.publisher_id_regex,
330 event_type='volume.attach.end',
331 )
334class VolumeDetachEnd(VolumeUpdateEnd):
336 @property
337 def filter_rule(self):
338 """Cinder volume notification filter"""
339 return filtering.NotificationFilter(
340 publisher_id=self.publisher_id_regex,
341 event_type='volume.detach.end',
342 )
345class VolumeResizeEnd(VolumeUpdateEnd):
347 @property
348 def filter_rule(self):
349 """Cinder volume notification filter"""
350 return filtering.NotificationFilter(
351 publisher_id=self.publisher_id_regex,
352 event_type='volume.resize.end',
353 )
356class VolumeDeleteEnd(VolumeNotificationEndpoint):
358 @property
359 def filter_rule(self):
360 """Cinder volume notification filter"""
361 return filtering.NotificationFilter(
362 publisher_id=self.publisher_id_regex,
363 event_type='volume.delete.end',
364 )
366 def info(self, ctxt, publisher_id, event_type, payload, metadata):
367 ctxt.request_id = metadata['message_id']
368 ctxt.project_domain = event_type
369 LOG.info("Event '%(event)s' received from %(publisher)s "
370 "with metadata %(metadata)s",
371 dict(event=event_type,
372 publisher=publisher_id,
373 metadata=metadata))
374 LOG.debug(payload)
375 volume_id = payload['volume_id']
376 poolname = payload['host']
377 volume = self.get_or_create_volume(volume_id, poolname)
379 try:
380 pool = self.get_or_create_pool(poolname)
381 except exception.PoolNotFound as exc:
382 LOG.exception(exc)
383 pool = None
385 self.delete_volume(volume, pool)