Coverage for watcher/decision_engine/model/notification/cinder.py: 82%

209 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright 2017 NEC Corporation 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from oslo_log import log 

18from watcher.common import cinder_helper 

19from watcher.common import exception 

20from watcher.decision_engine.model import element 

21from watcher.decision_engine.model.notification import base 

22from watcher.decision_engine.model.notification import filtering 

23 

24LOG = log.getLogger(__name__) 

25 

26 

27class CinderNotification(base.NotificationEndpoint): 

28 

29 def __init__(self, collector): 

30 super(CinderNotification, self).__init__(collector) 

31 self._cinder = None 

32 

33 @property 

34 def cinder(self): 

35 if self._cinder is None: 

36 self._cinder = cinder_helper.CinderHelper() 

37 return self._cinder 

38 

39 def update_pool(self, pool, data): 

40 """Update the storage pool using the notification data.""" 

41 pool.update({ 

42 "total_capacity_gb": data['total'], 

43 "free_capacity_gb": data['free'], 

44 "provisioned_capacity_gb": data['provisioned'], 

45 "allocated_capacity_gb": data['allocated'], 

46 "virtual_free": data['virtual_free'] 

47 }) 

48 

49 node_name = pool.name.split("#")[0] 

50 node = self.get_or_create_node(node_name) 

51 self.cluster_data_model.map_pool(pool, node) 

52 LOG.debug("Mapped pool %s to %s", pool.name, node.host) 

53 

54 def update_pool_by_api(self, pool): 

55 """Update the storage pool using the API data.""" 

56 if not pool: 

57 return 

58 _pool = self.cinder.get_storage_pool_by_name(pool.name) 

59 pool.update({ 

60 "total_volumes": _pool.total_volumes, 

61 "total_capacity_gb": _pool.total_capacity_gb, 

62 "free_capacity_gb": _pool.free_capacity_gb, 

63 "provisioned_capacity_gb": _pool.provisioned_capacity_gb, 

64 "allocated_capacity_gb": _pool.allocated_capacity_gb 

65 }) 

66 node_name = pool.name.split("#")[0] 

67 node = self.get_or_create_node(node_name) 

68 self.cluster_data_model.map_pool(pool, node) 

69 LOG.debug("Mapped pool %s to %s", pool.name, node.host) 

70 

71 def create_storage_node(self, name): 

72 """Create the storage node by querying the Cinder API.""" 

73 try: 

74 _node = self.cinder.get_storage_node_by_name(name) 

75 _volume_type = self.cinder.get_volume_type_by_backendname( 

76 # name is formatted as host@backendname 

77 name.split('@')[1]) 

78 storage_node = element.StorageNode( 

79 host=_node.host, 

80 zone=_node.zone, 

81 state=_node.state, 

82 status=_node.status, 

83 volume_type=_volume_type) 

84 return storage_node 

85 except Exception as exc: 

86 LOG.exception(exc) 

87 LOG.debug("Could not create storage node %s.", name) 

88 raise exception.StorageNodeNotFound(name=name) 

89 

90 def get_or_create_node(self, name): 

91 """Get storage node by name, otherwise create storage node""" 

92 if name is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 LOG.debug("Storage node name not provided: skipping") 

94 return 

95 try: 

96 return self.cluster_data_model.get_node_by_name(name) 

97 except exception.StorageNodeNotFound: 

98 # The node didn't exist yet so we create a new node object 

99 node = self.create_storage_node(name) 

100 LOG.debug("New storage node created: %s", name) 

101 self.cluster_data_model.add_node(node) 

102 LOG.debug("New storage node added: %s", name) 

103 return node 

104 

105 def create_pool(self, pool_name): 

106 """Create the storage pool by querying the Cinder API.""" 

107 try: 

108 _pool = self.cinder.get_storage_pool_by_name(pool_name) 

109 pool = element.Pool( 

110 name=_pool.name, 

111 total_volumes=_pool.total_volumes, 

112 total_capacity_gb=_pool.total_capacity_gb, 

113 free_capacity_gb=_pool.free_capacity_gb, 

114 provisioned_capacity_gb=_pool.provisioned_capacity_gb, 

115 allocated_capacity_gb=_pool.allocated_capacity_gb) 

116 return pool 

117 except Exception as exc: 

118 LOG.exception(exc) 

119 LOG.debug("Could not refresh the pool %s.", pool_name) 

120 raise exception.PoolNotFound(name=pool_name) 

121 

122 def get_or_create_pool(self, name): 

123 if not name: 

124 LOG.debug("Pool name not provided: skipping") 

125 return 

126 try: 

127 return self.cluster_data_model.get_pool_by_pool_name(name) 

128 except exception.PoolNotFound: 

129 # The pool didn't exist yet so we create a new pool object 

130 pool = self.create_pool(name) 

131 LOG.debug("New storage pool created: %s", name) 

132 self.cluster_data_model.add_pool(pool) 

133 LOG.debug("New storage pool added: %s", name) 

134 return pool 

135 

136 def get_or_create_volume(self, volume_id, pool_name=None): 

137 try: 

138 if pool_name: 

139 self.get_or_create_pool(pool_name) 

140 except exception.PoolNotFound: 

141 LOG.warning("Could not find storage pool %(pool)s for " 

142 "volume %(volume)s", 

143 dict(pool=pool_name, volume=volume_id)) 

144 try: 

145 return self.cluster_data_model.get_volume_by_uuid(volume_id) 

146 except exception.VolumeNotFound: 

147 # The volume didn't exist yet so we create a new volume object 

148 volume = element.Volume(uuid=volume_id) 

149 self.cluster_data_model.add_volume(volume) 

150 return volume 

151 

152 def update_volume(self, volume, data): 

153 """Update the volume using the notification data.""" 

154 

155 def _keyReplace(key): 

156 if key == 'instance_uuid': 

157 return 'server_id' 

158 if key == 'id': 

159 return 'attachment_id' 

160 

161 attachments = [ 

162 {_keyReplace(k): v for k, v in iter(d.items()) 

163 if k in ('instance_uuid', 'id')} 

164 for d in data['volume_attachment'] 

165 ] 

166 

167 # glance_metadata is provided if volume is bootable 

168 bootable = False 

169 if 'glance_metadata' in data: 

170 bootable = True 

171 

172 volume.update({ 

173 "name": data['display_name'] or "", 

174 "size": data['size'], 

175 "status": data['status'], 

176 "attachments": attachments, 

177 "snapshot_id": data['snapshot_id'] or "", 

178 "project_id": data['tenant_id'], 

179 "metadata": data['metadata'], 

180 "bootable": bootable 

181 }) 

182 

183 try: 

184 # if volume is under pool, let's update pool element. 

185 # get existing pool or create pool by cinder api 

186 pool = self.get_or_create_pool(data['host']) 

187 self.update_pool_by_api(pool) 

188 

189 except exception.PoolNotFound as exc: 

190 LOG.exception(exc) 

191 pool = None 

192 

193 self.update_volume_mapping(volume, pool) 

194 

195 def update_volume_mapping(self, volume, pool): 

196 if pool is None: 

197 self.cluster_data_model.add_volume(volume) 

198 LOG.debug("Volume %s not yet attached to any pool: skipping", 

199 volume.uuid) 

200 return 

201 try: 

202 try: 

203 current_pool = ( 

204 self.cluster_data_model.get_pool_by_volume( 

205 volume) or self.get_or_create_pool(pool.name)) 

206 except exception.PoolNotFound as exc: 

207 LOG.exception(exc) 

208 # If we can't create the pool, 

209 # we consider the volume as unmapped 

210 current_pool = None 

211 

212 LOG.debug("Mapped pool %s found", pool.name) 

213 if current_pool and pool != current_pool: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 LOG.debug("Unmapping volume %s from %s", 

215 volume.uuid, pool.name) 

216 self.cluster_data_model.unmap_volume(volume, current_pool) 

217 except exception.VolumeNotFound: 

218 # The instance didn't exist yet so we map it for the first time 

219 LOG.debug("New volume: mapping it to %s", pool.name) 

220 finally: 

221 if pool: 221 ↛ exitline 221 didn't return from function 'update_volume_mapping' because the condition on line 221 was always true

222 self.cluster_data_model.map_volume(volume, pool) 

223 LOG.debug("Mapped volume %s to %s", volume.uuid, pool.name) 

224 

225 def delete_volume(self, volume, pool): 

226 try: 

227 self.cluster_data_model.delete_volume(volume) 

228 except Exception: 

229 LOG.info("Volume %s already deleted", volume.uuid) 

230 

231 try: 

232 if pool: 232 ↛ exitline 232 didn't return from function 'delete_volume' because the condition on line 232 was always true

233 # if volume is under pool, let's update pool element. 

234 # get existing pool or create pool by cinder api 

235 pool = self.get_or_create_pool(pool.name) 

236 self.update_pool_by_api(pool) 

237 except exception.PoolNotFound as exc: 

238 LOG.exception(exc) 

239 pool = None 

240 

241 

242class CapacityNotificationEndpoint(CinderNotification): 

243 

244 @property 

245 def filter_rule(self): 

246 """Cinder capacity notification filter""" 

247 return filtering.NotificationFilter( 

248 publisher_id=r'capacity.*', 

249 event_type='capacity.pool', 

250 ) 

251 

252 def info(self, ctxt, publisher_id, event_type, payload, metadata): 

253 ctxt.request_id = metadata['message_id'] 

254 ctxt.project_domain = event_type 

255 LOG.info("Event '%(event)s' received from %(publisher)s " 

256 "with metadata %(metadata)s", 

257 dict(event=event_type, 

258 publisher=publisher_id, 

259 metadata=metadata)) 

260 LOG.debug(payload) 

261 name = payload['name_to_id'] 

262 try: 

263 pool = self.get_or_create_pool(name) 

264 self.update_pool(pool, payload) 

265 except exception.PoolNotFound as exc: 

266 LOG.exception(exc) 

267 

268 

269class VolumeNotificationEndpoint(CinderNotification): 

270 publisher_id_regex = r'^volume.*' 

271 

272 

273class VolumeCreateEnd(VolumeNotificationEndpoint): 

274 

275 @property 

276 def filter_rule(self): 

277 """Cinder volume notification filter""" 

278 return filtering.NotificationFilter( 

279 publisher_id=self.publisher_id_regex, 

280 event_type='volume.create.end', 

281 ) 

282 

283 def info(self, ctxt, publisher_id, event_type, payload, metadata): 

284 ctxt.request_id = metadata['message_id'] 

285 ctxt.project_domain = event_type 

286 LOG.info("Event '%(event)s' received from %(publisher)s " 

287 "with metadata %(metadata)s", 

288 dict(event=event_type, 

289 publisher=publisher_id, 

290 metadata=metadata)) 

291 LOG.debug(payload) 

292 volume_id = payload['volume_id'] 

293 poolname = payload['host'] 

294 volume = self.get_or_create_volume(volume_id, poolname) 

295 self.update_volume(volume, payload) 

296 

297 

298class VolumeUpdateEnd(VolumeNotificationEndpoint): 

299 

300 @property 

301 def filter_rule(self): 

302 """Cinder volume notification filter""" 

303 return filtering.NotificationFilter( 

304 publisher_id=self.publisher_id_regex, 

305 event_type='volume.update.end', 

306 ) 

307 

308 def info(self, ctxt, publisher_id, event_type, payload, metadata): 

309 ctxt.request_id = metadata['message_id'] 

310 ctxt.project_domain = event_type 

311 LOG.info("Event '%(event)s' received from %(publisher)s " 

312 "with metadata %(metadata)s", 

313 dict(event=event_type, 

314 publisher=publisher_id, 

315 metadata=metadata)) 

316 LOG.debug(payload) 

317 volume_id = payload['volume_id'] 

318 poolname = payload['host'] 

319 volume = self.get_or_create_volume(volume_id, poolname) 

320 self.update_volume(volume, payload) 

321 

322 

323class VolumeAttachEnd(VolumeUpdateEnd): 

324 

325 @property 

326 def filter_rule(self): 

327 """Cinder volume notification filter""" 

328 return filtering.NotificationFilter( 

329 publisher_id=self.publisher_id_regex, 

330 event_type='volume.attach.end', 

331 ) 

332 

333 

334class VolumeDetachEnd(VolumeUpdateEnd): 

335 

336 @property 

337 def filter_rule(self): 

338 """Cinder volume notification filter""" 

339 return filtering.NotificationFilter( 

340 publisher_id=self.publisher_id_regex, 

341 event_type='volume.detach.end', 

342 ) 

343 

344 

345class VolumeResizeEnd(VolumeUpdateEnd): 

346 

347 @property 

348 def filter_rule(self): 

349 """Cinder volume notification filter""" 

350 return filtering.NotificationFilter( 

351 publisher_id=self.publisher_id_regex, 

352 event_type='volume.resize.end', 

353 ) 

354 

355 

356class VolumeDeleteEnd(VolumeNotificationEndpoint): 

357 

358 @property 

359 def filter_rule(self): 

360 """Cinder volume notification filter""" 

361 return filtering.NotificationFilter( 

362 publisher_id=self.publisher_id_regex, 

363 event_type='volume.delete.end', 

364 ) 

365 

366 def info(self, ctxt, publisher_id, event_type, payload, metadata): 

367 ctxt.request_id = metadata['message_id'] 

368 ctxt.project_domain = event_type 

369 LOG.info("Event '%(event)s' received from %(publisher)s " 

370 "with metadata %(metadata)s", 

371 dict(event=event_type, 

372 publisher=publisher_id, 

373 metadata=metadata)) 

374 LOG.debug(payload) 

375 volume_id = payload['volume_id'] 

376 poolname = payload['host'] 

377 volume = self.get_or_create_volume(volume_id, poolname) 

378 

379 try: 

380 pool = self.get_or_create_pool(poolname) 

381 except exception.PoolNotFound as exc: 

382 LOG.exception(exc) 

383 pool = None 

384 

385 self.delete_volume(volume, pool)