Coverage for watcher/decision_engine/strategy/strategies/storage_capacity_balance.py: 85%
225 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2017 ZTE Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
18from oslo_config import cfg
19from oslo_log import log
21from watcher._i18n import _
22from watcher.common import cinder_helper
23from watcher.decision_engine.strategy.strategies import base
25LOG = log.getLogger(__name__)
28class StorageCapacityBalance(base.WorkloadStabilizationBaseStrategy):
29 """Storage capacity balance using cinder volume migration
31 *Description*
33 This strategy migrates volumes based on the workload of the
34 cinder pools.
35 It makes decision to migrate a volume whenever a pool's used
36 utilization % is higher than the specified threshold. The volume
37 to be moved should make the pool close to average workload of all
38 cinder pools.
40 *Requirements*
42 * You must have at least 2 cinder volume pools to run
43 this strategy.
45 *Limitations*
47 * Volume migration depends on the storage device.
48 It may take a long time.
50 *Spec URL*
52 http://specs.openstack.org/openstack/watcher-specs/specs/queens/implemented/storage-capacity-balance.html
53 """
55 def __init__(self, config, osc=None):
56 """VolumeMigrate using cinder volume migration
58 :param config: A mapping containing the configuration of this strategy
59 :type config: :py:class:`~.Struct` instance
60 :param osc: :py:class:`~.OpenStackClients` instance
61 """
62 super(StorageCapacityBalance, self).__init__(config, osc)
63 self._cinder = None
64 self.volume_threshold = 80.0
65 self.pool_type_cache = dict()
66 self.source_pools = []
67 self.dest_pools = []
69 @property
70 def cinder(self):
71 if not self._cinder: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 self._cinder = cinder_helper.CinderHelper(osc=self.osc)
73 return self._cinder
75 @classmethod
76 def get_name(cls):
77 return "storage_capacity_balance"
79 @classmethod
80 def get_display_name(cls):
81 return _("Storage Capacity Balance Strategy")
83 @classmethod
84 def get_translatable_display_name(cls):
85 return "Storage Capacity Balance Strategy"
87 @classmethod
88 def get_schema(cls):
89 # Mandatory default setting for each element
90 return {
91 "properties": {
92 "volume_threshold": {
93 "description": "volume threshold for capacity balance",
94 "type": "number",
95 "default": 80.0
96 },
97 },
98 }
100 @classmethod
101 def get_config_opts(cls):
102 return super(StorageCapacityBalance, cls).get_config_opts() + [
103 cfg.ListOpt(
104 "ex_pools",
105 help="exclude pools",
106 default=['local_vstorage']),
107 ]
109 def get_pools(self, cinder):
110 """Get all volume pools excepting ex_pools.
112 :param cinder: cinder client
113 :return: volume pools
114 """
115 ex_pools = self.config.ex_pools
116 pools = cinder.get_storage_pool_list()
117 filtered_pools = [p for p in pools
118 if p.pool_name not in ex_pools]
119 return filtered_pools
121 def get_volumes(self, cinder):
122 """Get all volumes with status in available or in-use and no snapshot.
124 :param cinder: cinder client
125 :return: all volumes
126 """
127 all_volumes = cinder.get_volume_list()
128 valid_status = ['in-use', 'available']
130 volume_snapshots = cinder.get_volume_snapshots_list()
131 snapshot_volume_ids = []
132 for snapshot in volume_snapshots:
133 snapshot_volume_ids.append(snapshot.volume_id)
135 nosnap_volumes = list(filter(lambda v: v.id not in snapshot_volume_ids,
136 all_volumes))
137 LOG.info("volumes in snap: %s", snapshot_volume_ids)
138 status_volumes = list(
139 filter(lambda v: v.status in valid_status, nosnap_volumes))
140 valid_volumes = [v for v in status_volumes
141 if getattr(v, 'migration_status') == 'success' or
142 getattr(v, 'migration_status') is None]
143 LOG.info("valid volumes: %s", valid_volumes)
145 return valid_volumes
147 def group_pools(self, pools, threshold):
148 """group volume pools by threshold.
150 :param pools: all volume pools
151 :param threshold: volume threshold
152 :return: under and over threshold pools
153 """
154 under_pools = list(
155 filter(lambda p: float(p.total_capacity_gb) -
156 float(p.free_capacity_gb) <
157 float(p.total_capacity_gb) * threshold, pools))
159 over_pools = list(
160 filter(lambda p: float(p.total_capacity_gb) -
161 float(p.free_capacity_gb) >=
162 float(p.total_capacity_gb) * threshold, pools))
164 return over_pools, under_pools
166 def get_volume_type_by_name(self, cinder, backendname):
167 # return list of pool type
168 if backendname in self.pool_type_cache.keys():
169 return self.pool_type_cache.get(backendname)
171 volume_type_list = cinder.get_volume_type_list()
172 volume_type = list(filter(
173 lambda volume_type:
174 volume_type.extra_specs.get(
175 'volume_backend_name') == backendname, volume_type_list))
176 if volume_type:
177 self.pool_type_cache[backendname] = volume_type
178 return self.pool_type_cache.get(backendname)
179 else:
180 return []
182 def migrate_fit(self, volume, threshold):
183 target_pool_name = None
184 if volume.volume_type:
185 LOG.info("volume %s type %s", volume.id, volume.volume_type)
186 return target_pool_name
187 self.dest_pools.sort(
188 key=lambda p: float(p.free_capacity_gb) /
189 float(p.total_capacity_gb))
190 for pool in reversed(self.dest_pools):
191 total_cap = float(pool.total_capacity_gb)
192 allocated = float(pool.allocated_capacity_gb)
193 ratio = pool.max_over_subscription_ratio
194 if total_cap * ratio < allocated + float(volume.size):
195 LOG.info("pool %s allocated over", pool.name)
196 continue
197 free_cap = float(pool.free_capacity_gb) - float(volume.size)
198 if free_cap > (1 - threshold) * total_cap:
199 target_pool_name = pool.name
200 index = self.dest_pools.index(pool)
201 setattr(self.dest_pools[index], 'free_capacity_gb',
202 str(free_cap))
203 LOG.info("volume: get pool %s for vol %s", target_pool_name,
204 volume.name)
205 break
206 return target_pool_name
208 def check_pool_type(self, volume, dest_pool):
209 target_type = None
210 src_extra_specs = {}
211 # check type feature
212 if not volume.volume_type: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 return target_type
214 volume_type_list = self.cinder.get_volume_type_list()
215 volume_type = list(filter(
216 lambda volume_type:
217 volume_type.name == volume.volume_type, volume_type_list))
218 if volume_type: 218 ↛ 222line 218 didn't jump to line 222 because the condition on line 218 was always true
219 src_extra_specs = volume_type[0].extra_specs
220 src_extra_specs.pop('volume_backend_name', None)
222 backendname = getattr(dest_pool, 'volume_backend_name')
223 dst_pool_type = self.get_volume_type_by_name(self.cinder, backendname)
225 for src_key in src_extra_specs.keys(): 225 ↛ 226line 225 didn't jump to line 226 because the loop on line 225 never started
226 dst_pool_type = [pt for pt in dst_pool_type
227 if pt.extra_specs.get(src_key) ==
228 src_extra_specs.get(src_key)]
229 if dst_pool_type:
230 if volume.volume_type: 230 ↛ 234line 230 didn't jump to line 234 because the condition on line 230 was always true
231 if dst_pool_type[0].name != volume.volume_type: 231 ↛ 235line 231 didn't jump to line 235 because the condition on line 231 was always true
232 target_type = dst_pool_type[0].name
233 else:
234 target_type = dst_pool_type[0].name
235 return target_type
237 def retype_fit(self, volume, threshold):
238 target_type = None
239 self.dest_pools.sort(
240 key=lambda p: float(p.free_capacity_gb) /
241 float(p.total_capacity_gb))
242 for pool in reversed(self.dest_pools):
243 backendname = getattr(pool, 'volume_backend_name')
244 pool_type = self.get_volume_type_by_name(self.cinder, backendname)
245 LOG.info("volume: pool %s, type %s", pool.name, pool_type)
246 if pool_type is None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 continue
248 total_cap = float(pool.total_capacity_gb)
249 allocated = float(pool.allocated_capacity_gb)
250 ratio = pool.max_over_subscription_ratio
251 if total_cap * ratio < allocated + float(volume.size):
252 LOG.info("pool %s allocated over", pool.name)
253 continue
254 free_cap = float(pool.free_capacity_gb) - float(volume.size)
255 if free_cap > (1 - threshold) * total_cap:
256 target_type = self.check_pool_type(volume, pool)
257 if target_type is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 continue
259 index = self.dest_pools.index(pool)
260 setattr(self.dest_pools[index], 'free_capacity_gb',
261 str(free_cap))
262 LOG.info("volume: get type %s for vol %s", target_type,
263 volume.name)
264 break
265 return target_type
267 def get_actions(self, pool, volumes, threshold):
268 """get volume, pool key-value action
270 return: retype, migrate dict
271 """
272 retype_dicts = dict()
273 migrate_dicts = dict()
274 total_cap = float(pool.total_capacity_gb)
275 used_cap = float(pool.total_capacity_gb) - float(pool.free_capacity_gb)
276 seek_flag = True
278 volumes_in_pool = list(
279 filter(lambda v: getattr(v, 'os-vol-host-attr:host') == pool.name,
280 volumes))
281 LOG.info("volumes in pool: %s", str(volumes_in_pool))
282 if not volumes_in_pool: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return retype_dicts, migrate_dicts
284 ava_volumes = list(filter(lambda v: v.status == 'available',
285 volumes_in_pool))
286 ava_volumes.sort(key=lambda v: float(v.size))
287 LOG.info("available volumes in pool: %s ", str(ava_volumes))
288 for vol in ava_volumes:
289 vol_flag = False
290 migrate_pool = self.migrate_fit(vol, threshold)
291 if migrate_pool: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 migrate_dicts[vol.id] = migrate_pool
293 vol_flag = True
294 else:
295 target_type = self.retype_fit(vol, threshold)
296 if target_type: 296 ↛ 299line 296 didn't jump to line 299 because the condition on line 296 was always true
297 retype_dicts[vol.id] = target_type
298 vol_flag = True
299 if vol_flag: 299 ↛ 288line 299 didn't jump to line 288 because the condition on line 299 was always true
300 used_cap -= float(vol.size)
301 if used_cap < threshold * total_cap: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 seek_flag = False
303 break
304 if seek_flag: 304 ↛ 327line 304 didn't jump to line 327 because the condition on line 304 was always true
305 noboot_volumes = list(
306 filter(lambda v: v.bootable.lower() == 'false' and
307 v.status == 'in-use', volumes_in_pool))
308 noboot_volumes.sort(key=lambda v: float(v.size))
309 LOG.info("noboot volumes: %s ", str(noboot_volumes))
310 for vol in noboot_volumes:
311 vol_flag = False
312 migrate_pool = self.migrate_fit(vol, threshold)
313 if migrate_pool:
314 migrate_dicts[vol.id] = migrate_pool
315 vol_flag = True
316 else:
317 target_type = self.retype_fit(vol, threshold)
318 if target_type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 retype_dicts[vol.id] = target_type
320 vol_flag = True
321 if vol_flag:
322 used_cap -= float(vol.size)
323 if used_cap < threshold * total_cap: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 seek_flag = False
325 break
327 if seek_flag: 327 ↛ 350line 327 didn't jump to line 350 because the condition on line 327 was always true
328 boot_volumes = list(
329 filter(lambda v: v.bootable.lower() == 'true' and
330 v.status == 'in-use', volumes_in_pool)
331 )
332 boot_volumes.sort(key=lambda v: float(v.size))
333 LOG.info("boot volumes: %s ", str(boot_volumes))
334 for vol in boot_volumes:
335 vol_flag = False
336 migrate_pool = self.migrate_fit(vol, threshold)
337 if migrate_pool: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 migrate_dicts[vol.id] = migrate_pool
339 vol_flag = True
340 else:
341 target_type = self.retype_fit(vol, threshold)
342 if target_type:
343 retype_dicts[vol.id] = target_type
344 vol_flag = True
345 if vol_flag:
346 used_cap -= float(vol.size)
347 if used_cap < threshold * total_cap: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 seek_flag = False
349 break
350 return retype_dicts, migrate_dicts
352 def pre_execute(self):
353 LOG.info("Initializing " + self.get_display_name() + " Strategy")
354 self.volume_threshold = self.input_parameters.volume_threshold
356 def do_execute(self, audit=None):
357 """Strategy execution phase
359 This phase is where you should put the main logic of your strategy.
360 """
361 all_pools = self.get_pools(self.cinder)
362 all_volumes = self.get_volumes(self.cinder)
363 threshold = float(self.volume_threshold) / 100
364 self.source_pools, self.dest_pools = self.group_pools(
365 all_pools, threshold)
366 LOG.info(" source pools: %s dest pools:%s",
367 self.source_pools, self.dest_pools)
368 if not self.source_pools: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 LOG.info("No pools require optimization")
370 return
372 if not self.dest_pools: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 LOG.info("No enough pools for optimization")
374 return
375 for source_pool in self.source_pools:
376 retype_actions, migrate_actions = self.get_actions(
377 source_pool, all_volumes, threshold)
378 for vol_id, pool_type in retype_actions.items():
379 vol = [v for v in all_volumes if v.id == vol_id]
380 parameters = {'migration_type': 'retype',
381 'destination_type': pool_type,
382 'resource_name': vol[0].name}
383 self.solution.add_action(action_type='volume_migrate',
384 resource_id=vol_id,
385 input_parameters=parameters)
386 for vol_id, pool_name in migrate_actions.items():
387 vol = [v for v in all_volumes if v.id == vol_id]
388 parameters = {'migration_type': 'migrate',
389 'destination_node': pool_name,
390 'resource_name': vol[0].name}
391 self.solution.add_action(action_type='volume_migrate',
392 resource_id=vol_id,
393 input_parameters=parameters)
395 def post_execute(self):
396 """Post-execution phase
398 """
399 self.solution.set_efficacy_indicators(
400 instance_migrations_count=0,
401 instances_count=0,
402 )