Coverage for watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py: 88%
256 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2#
3# Authors: Vojtech CIMA <cima@zhaw.ch>
4# Bruno GRAZIOLI <gaea@zhaw.ch>
5# Sean MURPHY <murp@zhaw.ch>
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16# implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19#
21import collections
23from oslo_log import log
24import oslo_utils
26from watcher._i18n import _
27from watcher.applier.actions import migration
28from watcher.common import exception
29from watcher.decision_engine.model import element
30from watcher.decision_engine.strategy.strategies import base
32LOG = log.getLogger(__name__)
35class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
36 """VM Workload Consolidation Strategy
38 A load consolidation strategy based on heuristic first-fit
39 algorithm which focuses on measured CPU utilization and tries to
40 minimize hosts which have too much or too little load respecting
41 resource capacity constraints.
43 This strategy produces a solution resulting in more efficient
44 utilization of cluster resources using following four phases:
46 * Offload phase - handling over-utilized resources
47 * Consolidation phase - handling under-utilized resources
48 * Solution optimization - reducing number of migrations
49 * Disability of unused compute nodes
51 A capacity coefficients (cc) might be used to adjust optimization
52 thresholds. Different resources may require different coefficient
53 values as well as setting up different coefficient values in both
54 phases may lead to more efficient consolidation in the end.
55 If the cc equals 1 the full resource capacity may be used, cc
56 values lower than 1 will lead to resource under utilization and
57 values higher than 1 will lead to resource overbooking.
58 e.g. If targeted utilization is 80 percent of a compute node capacity,
59 the coefficient in the consolidation phase will be 0.8, but
60 may any lower value in the offloading phase. The lower it gets
61 the cluster will appear more released (distributed) for the
62 following consolidation phase.
64 As this strategy leverages VM live migration to move the load
65 from one compute node to another, this feature needs to be set up
66 correctly on all compute nodes within the cluster.
67 This strategy assumes it is possible to live migrate any VM from
68 an active compute node to any other active compute node.
69 """
71 AGGREGATE = 'mean'
72 DATASOURCE_METRICS = ['instance_ram_allocated', 'instance_cpu_usage',
73 'instance_ram_usage', 'instance_root_disk_size',
74 'host_cpu_usage', 'host_ram_usage']
76 MIGRATION = "migrate"
77 CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
79 def __init__(self, config, osc=None):
80 super(VMWorkloadConsolidation, self).__init__(config, osc)
81 self.number_of_migrations = 0
82 self.number_of_released_nodes = 0
83 self.datasource_instance_data_cache = dict()
84 self.datasource_node_data_cache = dict()
85 # Host metric adjustments that take into account planned
86 # migrations.
87 self.host_metric_delta = collections.defaultdict(
88 lambda: collections.defaultdict(int))
90 @classmethod
91 def get_name(cls):
92 return "vm_workload_consolidation"
94 @classmethod
95 def get_display_name(cls):
96 return _("VM Workload Consolidation Strategy")
98 @classmethod
99 def get_translatable_display_name(cls):
100 return "VM Workload Consolidation Strategy"
102 @property
103 def period(self):
104 return self.input_parameters.get('period', 3600)
106 @property
107 def granularity(self):
108 return self.input_parameters.get('granularity', 300)
110 @classmethod
111 def get_schema(cls):
112 # Mandatory default setting for each element
113 return {
114 "properties": {
115 "period": {
116 "description": "The time interval in seconds for "
117 "getting statistic aggregation",
118 "type": "number",
119 "default": 3600
120 },
121 "granularity": {
122 "description": "The time between two measures in an "
123 "aggregated timeseries of a metric.",
124 "type": "number",
125 "default": 300
126 },
127 }
128 }
130 def get_available_compute_nodes(self):
131 default_node_scope = [element.ServiceState.ENABLED.value,
132 element.ServiceState.DISABLED.value]
133 nodes = self.compute_model.get_all_compute_nodes().items()
134 return {uuid: cn for uuid, cn in
135 nodes
136 if cn.state == element.ServiceState.ONLINE.value and
137 cn.status in default_node_scope}
139 def get_instance_state_str(self, instance):
140 """Get instance state in string format.
142 :param instance:
143 """
144 if isinstance(instance.state, str): 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was always true
145 return instance.state
146 elif isinstance(instance.state, element.InstanceState):
147 return instance.state.value
148 else:
149 LOG.error('Unexpected instance state type, '
150 'state=%(state)s, state_type=%(st)s.',
151 dict(state=instance.state,
152 st=type(instance.state)))
153 raise exception.WatcherException
155 def get_node_status_str(self, node):
156 """Get node status in string format.
158 :param node:
159 """
160 if isinstance(node.status, str): 160 ↛ 162line 160 didn't jump to line 162 because the condition on line 160 was always true
161 return node.status
162 elif isinstance(node.status, element.ServiceState):
163 return node.status.value
164 else:
165 LOG.error('Unexpected node status type, '
166 'status=%(status)s, status_type=%(st)s.',
167 dict(status=node.status,
168 st=type(node.status)))
169 raise exception.WatcherException
171 def add_action_enable_compute_node(self, node):
172 """Add an action for node enabler into the solution.
174 :param node: node object
175 :return: None
176 """
177 params = {'state': element.ServiceState.ENABLED.value,
178 'resource_name': node.hostname}
179 self.solution.add_action(
180 action_type=self.CHANGE_NOVA_SERVICE_STATE,
181 resource_id=node.uuid,
182 input_parameters=params)
183 self.number_of_released_nodes -= 1
185 def add_action_disable_node(self, node):
186 """Add an action for node disability into the solution.
188 :param node: node object
189 :return: None
190 """
191 params = {'state': element.ServiceState.DISABLED.value,
192 'disabled_reason': self.REASON_FOR_DISABLE,
193 'resource_name': node.hostname}
194 self.solution.add_action(
195 action_type=self.CHANGE_NOVA_SERVICE_STATE,
196 resource_id=node.uuid,
197 input_parameters=params)
198 self.number_of_released_nodes += 1
200 def add_migration(self, instance, source_node, destination_node):
201 """Add an action for VM migration into the solution.
203 :param instance: instance object
204 :param source_node: node object
205 :param destination_node: node object
206 :return: None
207 """
208 instance_state_str = self.get_instance_state_str(instance)
209 if instance_state_str in (element.InstanceState.ACTIVE.value,
210 element.InstanceState.PAUSED.value):
211 migration_type = migration.Migrate.LIVE_MIGRATION
212 elif instance_state_str == element.InstanceState.STOPPED.value:
213 migration_type = migration.Migrate.COLD_MIGRATION
214 else:
215 LOG.error(
216 'Cannot live migrate: instance_uuid=%(instance_uuid)s, '
217 'state=%(instance_state)s.', dict(
218 instance_uuid=instance.uuid,
219 instance_state=instance_state_str))
220 return
222 # Here will makes repeated actions to enable the same compute node,
223 # when migrating VMs to the destination node which is disabled.
224 # Whether should we remove the same actions in the solution???
225 destination_node_status_str = self.get_node_status_str(
226 destination_node)
227 if destination_node_status_str == element.ServiceState.DISABLED.value: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 self.add_action_enable_compute_node(destination_node)
230 if self.compute_model.migrate_instance( 230 ↛ exitline 230 didn't return from function 'add_migration' because the condition on line 230 was always true
231 instance, source_node, destination_node):
232 self.add_action_migrate(
233 instance,
234 migration_type,
235 source_node,
236 destination_node)
237 self.number_of_migrations += 1
239 instance_util = self.get_instance_utilization(instance)
240 self.host_metric_delta[source_node.hostname]['cpu'] -= (
241 instance_util['cpu'])
242 # We'll deduce the vm allocated memory.
243 self.host_metric_delta[source_node.hostname]['ram'] -= (
244 instance.memory)
246 self.host_metric_delta[destination_node.hostname]['cpu'] += (
247 instance_util['cpu'])
248 self.host_metric_delta[destination_node.hostname]['ram'] += (
249 instance.memory)
251 def disable_unused_nodes(self):
252 """Generate actions for disabling unused nodes.
254 :return: None
255 """
256 for node in self.get_available_compute_nodes().values():
257 if (len(self.compute_model.get_node_instances(node)) == 0 and
258 node.status !=
259 element.ServiceState.DISABLED.value):
260 self.add_action_disable_node(node)
262 def get_instance_utilization(self, instance):
263 """Collect cpu, ram and disk utilization statistics of a VM.
265 :param instance: instance object
266 :param aggr: string
267 :return: dict(cpu(number of vcpus used), ram(MB used), disk(B used))
268 """
269 instance_cpu_util = None
270 instance_ram_util = None
271 instance_disk_util = None
273 if instance.uuid in self.datasource_instance_data_cache.keys():
274 return self.datasource_instance_data_cache.get(instance.uuid)
276 instance_cpu_util = self.datasource_backend.get_instance_cpu_usage(
277 resource=instance, period=self.period,
278 aggregate=self.AGGREGATE, granularity=self.granularity)
279 instance_ram_util = self.datasource_backend.get_instance_ram_usage(
280 resource=instance, period=self.period,
281 aggregate=self.AGGREGATE, granularity=self.granularity)
282 if not instance_ram_util: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 instance_ram_util = (
284 self.datasource_backend.get_instance_ram_allocated(
285 resource=instance, period=self.period,
286 aggregate=self.AGGREGATE, granularity=self.granularity))
287 instance_disk_util = (
288 self.datasource_backend.get_instance_root_disk_size(
289 resource=instance, period=self.period,
290 aggregate=self.AGGREGATE, granularity=self.granularity))
292 if instance_cpu_util: 292 ↛ 296line 292 didn't jump to line 296 because the condition on line 292 was always true
293 total_cpu_utilization = (
294 instance.vcpus * (instance_cpu_util / 100.0))
295 else:
296 total_cpu_utilization = instance.vcpus
298 if not instance_ram_util: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 instance_ram_util = instance.memory
300 LOG.warning('No values returned by %s for memory.resident, '
301 'use instance flavor ram value', instance.uuid)
303 if not instance_disk_util: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 instance_disk_util = instance.disk
305 LOG.warning('No values returned by %s for disk.root.size, '
306 'use instance flavor disk value', instance.uuid)
308 self.datasource_instance_data_cache[instance.uuid] = dict(
309 cpu=total_cpu_utilization, ram=instance_ram_util,
310 disk=instance_disk_util)
311 return self.datasource_instance_data_cache.get(instance.uuid)
313 def _get_node_total_utilization(self, node):
314 if node.hostname in self.datasource_node_data_cache:
315 return self.datasource_node_data_cache[node.hostname]
317 cpu = self.datasource_backend.get_host_cpu_usage(
318 node, self.period, self.AGGREGATE,
319 self.granularity)
320 ram = self.datasource_backend.get_host_ram_usage(
321 node, self.period, self.AGGREGATE,
322 self.granularity)
324 self.datasource_node_data_cache[node.hostname] = dict(
325 cpu=cpu, ram=ram)
326 return self.datasource_node_data_cache[node.hostname]
328 def get_node_utilization(self, node):
329 """Collect cpu, ram and disk utilization statistics of a node.
331 :param node: node object
332 :param aggr: string
333 :return: dict(cpu(number of cores used), ram(MB used), disk(B used))
334 """
335 node_instances = self.compute_model.get_node_instances(node)
336 node_ram_util = 0
337 node_disk_util = 0
338 node_cpu_util = 0
339 for instance in node_instances:
340 instance_util = self.get_instance_utilization(
341 instance)
342 node_cpu_util += instance_util['cpu']
343 node_ram_util += instance_util['ram']
344 node_disk_util += instance_util['disk']
345 LOG.debug("instance utilization: %s %s",
346 instance, instance_util)
348 total_node_util = self._get_node_total_utilization(node)
349 total_node_cpu_util = total_node_util['cpu'] or 0
350 if total_node_cpu_util:
351 total_node_cpu_util = total_node_cpu_util * node.vcpus / 100
352 # account for planned migrations
353 total_node_cpu_util += self.host_metric_delta[node.hostname]['cpu']
355 total_node_ram_util = total_node_util['ram'] or 0
356 if total_node_ram_util:
357 total_node_ram_util /= oslo_utils.units.Ki
358 total_node_ram_util += self.host_metric_delta[node.hostname]['ram']
360 LOG.debug(
361 "node utilization: %s. "
362 "total instance cpu: %s, "
363 "total instance ram: %s, "
364 "total instance disk: %s, "
365 "total host cpu: %s, "
366 "total host ram: %s, "
367 "node delta usage: %s.",
368 node,
369 node_cpu_util, node_ram_util, node_disk_util,
370 total_node_cpu_util, total_node_ram_util,
371 self.host_metric_delta[node.hostname])
373 return dict(cpu=max(node_cpu_util, total_node_cpu_util),
374 ram=max(node_ram_util, total_node_ram_util),
375 disk=node_disk_util)
377 def get_node_capacity(self, node):
378 """Collect cpu, ram and disk capacity of a node.
380 :param node: node object
381 :return: dict(cpu(cores), ram(MB), disk(B))
382 """
383 return dict(cpu=node.vcpu_capacity, ram=node.memory_mb_capacity,
384 disk=node.disk_gb_capacity)
386 def get_relative_node_utilization(self, node):
387 """Return relative node utilization.
389 :param node: node object
390 :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
391 """
392 relative_node_utilization = {}
393 util = self.get_node_utilization(node)
394 cap = self.get_node_capacity(node)
395 for k in util.keys():
396 relative_node_utilization[k] = float(util[k]) / float(cap[k])
397 return relative_node_utilization
399 def get_relative_cluster_utilization(self):
400 """Calculate relative cluster utilization (rcu).
402 RCU is an average of relative utilizations (rhu) of active nodes.
403 :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
404 """
405 nodes = self.get_available_compute_nodes().values()
406 rcu = {}
407 counters = {}
408 for node in nodes:
409 node_status_str = self.get_node_status_str(node)
410 if node_status_str == element.ServiceState.ENABLED.value: 410 ↛ 408line 410 didn't jump to line 408 because the condition on line 410 was always true
411 rhu = self.get_relative_node_utilization(node)
412 for k in rhu.keys():
413 if k not in rcu:
414 rcu[k] = 0
415 if k not in counters:
416 counters[k] = 0
417 rcu[k] += rhu[k]
418 counters[k] += 1
419 for k in rcu.keys():
420 rcu[k] /= counters[k]
421 return rcu
423 def is_overloaded(self, node, cc):
424 """Indicate whether a node is overloaded.
426 This considers provided resource capacity coefficients (cc).
427 :param node: node object
428 :param cc: dictionary containing resource capacity coefficients
429 :return: [True, False]
430 """
431 node_capacity = self.get_node_capacity(node)
432 node_utilization = self.get_node_utilization(
433 node)
434 metrics = ['cpu']
435 for m in metrics:
436 if node_utilization[m] > node_capacity[m] * cc[m]:
437 return True
438 return False
440 def instance_fits(self, instance, node, cc):
441 """Indicate whether is a node able to accommodate a VM.
443 This considers provided resource capacity coefficients (cc).
444 :param instance: :py:class:`~.element.Instance`
445 :param node: node object
446 :param cc: dictionary containing resource capacity coefficients
447 :return: [True, False]
448 """
449 node_capacity = self.get_node_capacity(node)
450 node_utilization = self.get_node_utilization(node)
451 instance_utilization = self.get_instance_utilization(instance)
452 metrics = ['cpu', 'ram', 'disk']
453 for m in metrics:
454 fits = (instance_utilization[m] + node_utilization[m] <=
455 node_capacity[m] * cc[m])
456 LOG.debug(
457 "Instance fits: %s, metric: %s, instance: %s, "
458 "node: %s, instance utilization: %s, "
459 "node utilization: %s, node capacity: %s, cc: %s",
460 fits, m, instance, node, instance_utilization[m],
461 node_utilization[m], node_capacity[m], cc[m])
462 if not fits:
463 return False
464 return True
466 def optimize_solution(self):
467 """Optimize solution.
469 This is done by eliminating unnecessary or circular set of migrations
470 which can be replaced by a more efficient solution.
471 e.g.:
473 * A->B, B->C => replace migrations A->B, B->C with
474 a single migration A->C as both solution result in
475 VM running on node C which can be achieved with
476 one migration instead of two.
477 * A->B, B->A => remove A->B and B->A as they do not result
478 in a new VM placement.
479 """
480 migrate_actions = (
481 a for a in self.solution.actions if a[
482 'action_type'] == self.MIGRATION)
483 instance_to_be_migrated = (
484 a['input_parameters']['resource_id'] for a in migrate_actions)
485 instance_uuids = list(set(instance_to_be_migrated))
486 for instance_uuid in instance_uuids:
487 actions = list(
488 a for a in self.solution.actions if a[
489 'input_parameters'][
490 'resource_id'] == instance_uuid)
491 if len(actions) > 1:
492 src_name = actions[0]['input_parameters']['source_node']
493 dst_name = actions[-1]['input_parameters']['destination_node']
494 for a in actions:
495 self.solution.actions.remove(a)
496 self.number_of_migrations -= 1
497 LOG.info("Optimized migrations: %s. "
498 "Source: %s, destination: %s", actions,
499 src_name, dst_name)
500 src_node = self.compute_model.get_node_by_name(src_name)
501 dst_node = self.compute_model.get_node_by_name(dst_name)
502 instance = self.compute_model.get_instance_by_uuid(
503 instance_uuid)
504 if self.compute_model.migrate_instance( 504 ↛ 506line 504 didn't jump to line 506 because the condition on line 504 was never true
505 instance, dst_node, src_node):
506 self.add_migration(instance, src_node, dst_node)
508 def offload_phase(self, cc):
509 """Perform offloading phase.
511 This considers provided resource capacity coefficients.
512 Offload phase performing first-fit based bin packing to offload
513 overloaded nodes. This is done in a fashion of moving
514 the least CPU utilized VM first as live migration these
515 generally causes less troubles. This phase results in a cluster
516 with no overloaded nodes.
517 * This phase is be able to enable disabled nodes (if needed
518 and any available) in the case of the resource capacity provided by
519 active nodes is not able to accommodate all the load.
520 As the offload phase is later followed by the consolidation phase,
521 the node enabler in this phase doesn't necessarily results
522 in more enabled nodes in the final solution.
524 :param cc: dictionary containing resource capacity coefficients
525 """
526 sorted_nodes = sorted(
527 self.get_available_compute_nodes().values(),
528 key=lambda x: self.get_node_utilization(x)['cpu'])
529 for node in reversed(sorted_nodes):
530 if self.is_overloaded(node, cc):
531 for instance in sorted( 531 ↛ 529line 531 didn't jump to line 529 because the loop on line 531 didn't complete
532 self.compute_model.get_node_instances(node),
533 key=lambda x: self.get_instance_utilization(
534 x)['cpu']
535 ):
536 LOG.info("Node %s overloaded, attempting to reduce load.",
537 node)
538 # skip exclude instance when migrating
539 if instance.watcher_exclude: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 LOG.debug("Instance is excluded by scope, "
541 "skipped: %s", instance.uuid)
542 continue
543 for destination_node in reversed(sorted_nodes): 543 ↛ 553line 543 didn't jump to line 553 because the loop on line 543 didn't complete
544 if self.instance_fits(
545 instance, destination_node, cc):
546 LOG.info("Offload: found fitting "
547 "destination (%s) for instance: %s. "
548 "Planning migration.",
549 destination_node, instance.uuid)
550 self.add_migration(instance, node,
551 destination_node)
552 break
553 if not self.is_overloaded(node, cc):
554 LOG.info("Node %s no longer overloaded.", node)
555 break
556 else:
557 LOG.info("Node still overloaded (%s), "
558 "continuing offload phase.", node)
560 def consolidation_phase(self, cc):
561 """Perform consolidation phase.
563 This considers provided resource capacity coefficients.
564 Consolidation phase performing first-fit based bin packing.
565 First, nodes with the lowest cpu utilization are consolidated
566 by moving their load to nodes with the highest cpu utilization
567 which can accommodate the load. In this phase the most cpu utilized
568 VMs are prioritized as their load is more difficult to accommodate
569 in the system than less cpu utilized VMs which can be later used
570 to fill smaller CPU capacity gaps.
572 :param cc: dictionary containing resource capacity coefficients
573 """
574 sorted_nodes = sorted(
575 self.get_available_compute_nodes().values(),
576 key=lambda x: self.get_node_utilization(x)['cpu'])
577 asc = 0
578 for node in sorted_nodes:
579 instances = sorted(
580 self.compute_model.get_node_instances(node),
581 key=lambda x: self.get_instance_utilization(x)['cpu'])
582 for instance in reversed(instances):
583 # skip exclude instance when migrating
584 if instance.watcher_exclude: 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true
585 LOG.debug("Instance is excluded by scope, "
586 "skipped: %s", instance.uuid)
587 continue
588 dsc = len(sorted_nodes) - 1
589 for destination_node in reversed(sorted_nodes): 589 ↛ 582line 589 didn't jump to line 582 because the loop on line 589 didn't complete
590 if asc >= dsc:
591 break
592 if self.instance_fits(
593 instance, destination_node, cc):
594 LOG.info("Consolidation: found fitting "
595 "destination (%s) for instance: %s. "
596 "Planning migration.",
597 destination_node, instance.uuid)
598 self.add_migration(instance, node,
599 destination_node)
600 break
601 dsc -= 1
602 asc += 1
604 def pre_execute(self):
605 self._pre_execute()
607 def do_execute(self, audit=None):
608 """Execute strategy.
610 This strategy produces a solution resulting in more
611 efficient utilization of cluster resources using following
612 four phases:
614 * Offload phase - handling over-utilized resources
615 * Consolidation phase - handling under-utilized resources
616 * Solution optimization - reducing number of migrations
617 * Disability of unused nodes
619 :param original_model: root_model object
620 """
621 LOG.info('Executing Smart Strategy')
622 rcu = self.get_relative_cluster_utilization()
624 cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
626 # Offloading phase
627 self.offload_phase(cc)
629 # Consolidation phase
630 self.consolidation_phase(cc)
632 # Optimize solution
633 self.optimize_solution()
635 # disable unused nodes
636 self.disable_unused_nodes()
638 rcu_after = self.get_relative_cluster_utilization()
639 info = {
640 "compute_nodes_count": len(
641 self.get_available_compute_nodes()),
642 'number_of_migrations': self.number_of_migrations,
643 'number_of_released_nodes':
644 self.number_of_released_nodes,
645 'relative_cluster_utilization_before': str(rcu),
646 'relative_cluster_utilization_after': str(rcu_after)
647 }
649 LOG.debug(info)
651 def post_execute(self):
652 self.solution.set_efficacy_indicators(
653 compute_nodes_count=len(
654 self.get_available_compute_nodes()),
655 released_compute_nodes_count=self.number_of_released_nodes,
656 instance_migrations_count=self.number_of_migrations,
657 )
659 LOG.debug(self.compute_model.to_string())