Coverage for watcher/decision_engine/strategy/strategies/storage_capacity_balance.py: 85%

225 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 ZTE Corporation 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17 

18from oslo_config import cfg 

19from oslo_log import log 

20 

21from watcher._i18n import _ 

22from watcher.common import cinder_helper 

23from watcher.decision_engine.strategy.strategies import base 

24 

25LOG = log.getLogger(__name__) 

26 

27 

28class StorageCapacityBalance(base.WorkloadStabilizationBaseStrategy): 

29 """Storage capacity balance using cinder volume migration 

30 

31 *Description* 

32 

33 This strategy migrates volumes based on the workload of the 

34 cinder pools. 

35 It makes decision to migrate a volume whenever a pool's used 

36 utilization % is higher than the specified threshold. The volume 

37 to be moved should make the pool close to average workload of all 

38 cinder pools. 

39 

40 *Requirements* 

41 

42 * You must have at least 2 cinder volume pools to run 

43 this strategy. 

44 

45 *Limitations* 

46 

47 * Volume migration depends on the storage device. 

48 It may take a long time. 

49 

50 *Spec URL* 

51 

52 http://specs.openstack.org/openstack/watcher-specs/specs/queens/implemented/storage-capacity-balance.html 

53 """ 

54 

55 def __init__(self, config, osc=None): 

56 """VolumeMigrate using cinder volume migration 

57 

58 :param config: A mapping containing the configuration of this strategy 

59 :type config: :py:class:`~.Struct` instance 

60 :param osc: :py:class:`~.OpenStackClients` instance 

61 """ 

62 super(StorageCapacityBalance, self).__init__(config, osc) 

63 self._cinder = None 

64 self.volume_threshold = 80.0 

65 self.pool_type_cache = dict() 

66 self.source_pools = [] 

67 self.dest_pools = [] 

68 

69 @property 

70 def cinder(self): 

71 if not self._cinder: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 self._cinder = cinder_helper.CinderHelper(osc=self.osc) 

73 return self._cinder 

74 

75 @classmethod 

76 def get_name(cls): 

77 return "storage_capacity_balance" 

78 

79 @classmethod 

80 def get_display_name(cls): 

81 return _("Storage Capacity Balance Strategy") 

82 

83 @classmethod 

84 def get_translatable_display_name(cls): 

85 return "Storage Capacity Balance Strategy" 

86 

87 @classmethod 

88 def get_schema(cls): 

89 # Mandatory default setting for each element 

90 return { 

91 "properties": { 

92 "volume_threshold": { 

93 "description": "volume threshold for capacity balance", 

94 "type": "number", 

95 "default": 80.0 

96 }, 

97 }, 

98 } 

99 

100 @classmethod 

101 def get_config_opts(cls): 

102 return super(StorageCapacityBalance, cls).get_config_opts() + [ 

103 cfg.ListOpt( 

104 "ex_pools", 

105 help="exclude pools", 

106 default=['local_vstorage']), 

107 ] 

108 

109 def get_pools(self, cinder): 

110 """Get all volume pools excepting ex_pools. 

111 

112 :param cinder: cinder client 

113 :return: volume pools 

114 """ 

115 ex_pools = self.config.ex_pools 

116 pools = cinder.get_storage_pool_list() 

117 filtered_pools = [p for p in pools 

118 if p.pool_name not in ex_pools] 

119 return filtered_pools 

120 

121 def get_volumes(self, cinder): 

122 """Get all volumes with status in available or in-use and no snapshot. 

123 

124 :param cinder: cinder client 

125 :return: all volumes 

126 """ 

127 all_volumes = cinder.get_volume_list() 

128 valid_status = ['in-use', 'available'] 

129 

130 volume_snapshots = cinder.get_volume_snapshots_list() 

131 snapshot_volume_ids = [] 

132 for snapshot in volume_snapshots: 

133 snapshot_volume_ids.append(snapshot.volume_id) 

134 

135 nosnap_volumes = list(filter(lambda v: v.id not in snapshot_volume_ids, 

136 all_volumes)) 

137 LOG.info("volumes in snap: %s", snapshot_volume_ids) 

138 status_volumes = list( 

139 filter(lambda v: v.status in valid_status, nosnap_volumes)) 

140 valid_volumes = [v for v in status_volumes 

141 if getattr(v, 'migration_status') == 'success' or 

142 getattr(v, 'migration_status') is None] 

143 LOG.info("valid volumes: %s", valid_volumes) 

144 

145 return valid_volumes 

146 

147 def group_pools(self, pools, threshold): 

148 """group volume pools by threshold. 

149 

150 :param pools: all volume pools 

151 :param threshold: volume threshold 

152 :return: under and over threshold pools 

153 """ 

154 under_pools = list( 

155 filter(lambda p: float(p.total_capacity_gb) - 

156 float(p.free_capacity_gb) < 

157 float(p.total_capacity_gb) * threshold, pools)) 

158 

159 over_pools = list( 

160 filter(lambda p: float(p.total_capacity_gb) - 

161 float(p.free_capacity_gb) >= 

162 float(p.total_capacity_gb) * threshold, pools)) 

163 

164 return over_pools, under_pools 

165 

166 def get_volume_type_by_name(self, cinder, backendname): 

167 # return list of pool type 

168 if backendname in self.pool_type_cache.keys(): 

169 return self.pool_type_cache.get(backendname) 

170 

171 volume_type_list = cinder.get_volume_type_list() 

172 volume_type = list(filter( 

173 lambda volume_type: 

174 volume_type.extra_specs.get( 

175 'volume_backend_name') == backendname, volume_type_list)) 

176 if volume_type: 

177 self.pool_type_cache[backendname] = volume_type 

178 return self.pool_type_cache.get(backendname) 

179 else: 

180 return [] 

181 

182 def migrate_fit(self, volume, threshold): 

183 target_pool_name = None 

184 if volume.volume_type: 

185 LOG.info("volume %s type %s", volume.id, volume.volume_type) 

186 return target_pool_name 

187 self.dest_pools.sort( 

188 key=lambda p: float(p.free_capacity_gb) / 

189 float(p.total_capacity_gb)) 

190 for pool in reversed(self.dest_pools): 

191 total_cap = float(pool.total_capacity_gb) 

192 allocated = float(pool.allocated_capacity_gb) 

193 ratio = pool.max_over_subscription_ratio 

194 if total_cap * ratio < allocated + float(volume.size): 

195 LOG.info("pool %s allocated over", pool.name) 

196 continue 

197 free_cap = float(pool.free_capacity_gb) - float(volume.size) 

198 if free_cap > (1 - threshold) * total_cap: 

199 target_pool_name = pool.name 

200 index = self.dest_pools.index(pool) 

201 setattr(self.dest_pools[index], 'free_capacity_gb', 

202 str(free_cap)) 

203 LOG.info("volume: get pool %s for vol %s", target_pool_name, 

204 volume.name) 

205 break 

206 return target_pool_name 

207 

208 def check_pool_type(self, volume, dest_pool): 

209 target_type = None 

210 src_extra_specs = {} 

211 # check type feature 

212 if not volume.volume_type: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 return target_type 

214 volume_type_list = self.cinder.get_volume_type_list() 

215 volume_type = list(filter( 

216 lambda volume_type: 

217 volume_type.name == volume.volume_type, volume_type_list)) 

218 if volume_type: 218 ↛ 222line 218 didn't jump to line 222 because the condition on line 218 was always true

219 src_extra_specs = volume_type[0].extra_specs 

220 src_extra_specs.pop('volume_backend_name', None) 

221 

222 backendname = getattr(dest_pool, 'volume_backend_name') 

223 dst_pool_type = self.get_volume_type_by_name(self.cinder, backendname) 

224 

225 for src_key in src_extra_specs.keys(): 225 ↛ 226line 225 didn't jump to line 226 because the loop on line 225 never started

226 dst_pool_type = [pt for pt in dst_pool_type 

227 if pt.extra_specs.get(src_key) == 

228 src_extra_specs.get(src_key)] 

229 if dst_pool_type: 

230 if volume.volume_type: 230 ↛ 234line 230 didn't jump to line 234 because the condition on line 230 was always true

231 if dst_pool_type[0].name != volume.volume_type: 231 ↛ 235line 231 didn't jump to line 235 because the condition on line 231 was always true

232 target_type = dst_pool_type[0].name 

233 else: 

234 target_type = dst_pool_type[0].name 

235 return target_type 

236 

237 def retype_fit(self, volume, threshold): 

238 target_type = None 

239 self.dest_pools.sort( 

240 key=lambda p: float(p.free_capacity_gb) / 

241 float(p.total_capacity_gb)) 

242 for pool in reversed(self.dest_pools): 

243 backendname = getattr(pool, 'volume_backend_name') 

244 pool_type = self.get_volume_type_by_name(self.cinder, backendname) 

245 LOG.info("volume: pool %s, type %s", pool.name, pool_type) 

246 if pool_type is None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 continue 

248 total_cap = float(pool.total_capacity_gb) 

249 allocated = float(pool.allocated_capacity_gb) 

250 ratio = pool.max_over_subscription_ratio 

251 if total_cap * ratio < allocated + float(volume.size): 

252 LOG.info("pool %s allocated over", pool.name) 

253 continue 

254 free_cap = float(pool.free_capacity_gb) - float(volume.size) 

255 if free_cap > (1 - threshold) * total_cap: 

256 target_type = self.check_pool_type(volume, pool) 

257 if target_type is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 continue 

259 index = self.dest_pools.index(pool) 

260 setattr(self.dest_pools[index], 'free_capacity_gb', 

261 str(free_cap)) 

262 LOG.info("volume: get type %s for vol %s", target_type, 

263 volume.name) 

264 break 

265 return target_type 

266 

267 def get_actions(self, pool, volumes, threshold): 

268 """get volume, pool key-value action 

269 

270 return: retype, migrate dict 

271 """ 

272 retype_dicts = dict() 

273 migrate_dicts = dict() 

274 total_cap = float(pool.total_capacity_gb) 

275 used_cap = float(pool.total_capacity_gb) - float(pool.free_capacity_gb) 

276 seek_flag = True 

277 

278 volumes_in_pool = list( 

279 filter(lambda v: getattr(v, 'os-vol-host-attr:host') == pool.name, 

280 volumes)) 

281 LOG.info("volumes in pool: %s", str(volumes_in_pool)) 

282 if not volumes_in_pool: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return retype_dicts, migrate_dicts 

284 ava_volumes = list(filter(lambda v: v.status == 'available', 

285 volumes_in_pool)) 

286 ava_volumes.sort(key=lambda v: float(v.size)) 

287 LOG.info("available volumes in pool: %s ", str(ava_volumes)) 

288 for vol in ava_volumes: 

289 vol_flag = False 

290 migrate_pool = self.migrate_fit(vol, threshold) 

291 if migrate_pool: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 migrate_dicts[vol.id] = migrate_pool 

293 vol_flag = True 

294 else: 

295 target_type = self.retype_fit(vol, threshold) 

296 if target_type: 296 ↛ 299line 296 didn't jump to line 299 because the condition on line 296 was always true

297 retype_dicts[vol.id] = target_type 

298 vol_flag = True 

299 if vol_flag: 299 ↛ 288line 299 didn't jump to line 288 because the condition on line 299 was always true

300 used_cap -= float(vol.size) 

301 if used_cap < threshold * total_cap: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 seek_flag = False 

303 break 

304 if seek_flag: 304 ↛ 327line 304 didn't jump to line 327 because the condition on line 304 was always true

305 noboot_volumes = list( 

306 filter(lambda v: v.bootable.lower() == 'false' and 

307 v.status == 'in-use', volumes_in_pool)) 

308 noboot_volumes.sort(key=lambda v: float(v.size)) 

309 LOG.info("noboot volumes: %s ", str(noboot_volumes)) 

310 for vol in noboot_volumes: 

311 vol_flag = False 

312 migrate_pool = self.migrate_fit(vol, threshold) 

313 if migrate_pool: 

314 migrate_dicts[vol.id] = migrate_pool 

315 vol_flag = True 

316 else: 

317 target_type = self.retype_fit(vol, threshold) 

318 if target_type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 retype_dicts[vol.id] = target_type 

320 vol_flag = True 

321 if vol_flag: 

322 used_cap -= float(vol.size) 

323 if used_cap < threshold * total_cap: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 seek_flag = False 

325 break 

326 

327 if seek_flag: 327 ↛ 350line 327 didn't jump to line 350 because the condition on line 327 was always true

328 boot_volumes = list( 

329 filter(lambda v: v.bootable.lower() == 'true' and 

330 v.status == 'in-use', volumes_in_pool) 

331 ) 

332 boot_volumes.sort(key=lambda v: float(v.size)) 

333 LOG.info("boot volumes: %s ", str(boot_volumes)) 

334 for vol in boot_volumes: 

335 vol_flag = False 

336 migrate_pool = self.migrate_fit(vol, threshold) 

337 if migrate_pool: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 migrate_dicts[vol.id] = migrate_pool 

339 vol_flag = True 

340 else: 

341 target_type = self.retype_fit(vol, threshold) 

342 if target_type: 

343 retype_dicts[vol.id] = target_type 

344 vol_flag = True 

345 if vol_flag: 

346 used_cap -= float(vol.size) 

347 if used_cap < threshold * total_cap: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 seek_flag = False 

349 break 

350 return retype_dicts, migrate_dicts 

351 

352 def pre_execute(self): 

353 LOG.info("Initializing " + self.get_display_name() + " Strategy") 

354 self.volume_threshold = self.input_parameters.volume_threshold 

355 

356 def do_execute(self, audit=None): 

357 """Strategy execution phase 

358 

359 This phase is where you should put the main logic of your strategy. 

360 """ 

361 all_pools = self.get_pools(self.cinder) 

362 all_volumes = self.get_volumes(self.cinder) 

363 threshold = float(self.volume_threshold) / 100 

364 self.source_pools, self.dest_pools = self.group_pools( 

365 all_pools, threshold) 

366 LOG.info(" source pools: %s dest pools:%s", 

367 self.source_pools, self.dest_pools) 

368 if not self.source_pools: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 LOG.info("No pools require optimization") 

370 return 

371 

372 if not self.dest_pools: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 LOG.info("No enough pools for optimization") 

374 return 

375 for source_pool in self.source_pools: 

376 retype_actions, migrate_actions = self.get_actions( 

377 source_pool, all_volumes, threshold) 

378 for vol_id, pool_type in retype_actions.items(): 

379 vol = [v for v in all_volumes if v.id == vol_id] 

380 parameters = {'migration_type': 'retype', 

381 'destination_type': pool_type, 

382 'resource_name': vol[0].name} 

383 self.solution.add_action(action_type='volume_migrate', 

384 resource_id=vol_id, 

385 input_parameters=parameters) 

386 for vol_id, pool_name in migrate_actions.items(): 

387 vol = [v for v in all_volumes if v.id == vol_id] 

388 parameters = {'migration_type': 'migrate', 

389 'destination_node': pool_name, 

390 'resource_name': vol[0].name} 

391 self.solution.add_action(action_type='volume_migrate', 

392 resource_id=vol_id, 

393 input_parameters=parameters) 

394 

395 def post_execute(self): 

396 """Post-execution phase 

397 

398 """ 

399 self.solution.set_efficacy_indicators( 

400 instance_migrations_count=0, 

401 instances_count=0, 

402 )