Coverage for watcher/decision_engine/strategy/strategies/basic_consolidation.py: 93%
165 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2015 b<>com
3#
4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
20from oslo_config import cfg
21from oslo_log import log
23from watcher._i18n import _
24from watcher.decision_engine.model import element
25from watcher.decision_engine.strategy.strategies import base
27LOG = log.getLogger(__name__)
30class BasicConsolidation(base.ServerConsolidationBaseStrategy):
31 """Good server consolidation strategy
33 Basic offline consolidation using live migration
35 Consolidation of VMs is essential to achieve energy optimization in cloud
36 environments such as OpenStack. As VMs are spinned up and/or moved over
37 time, it becomes necessary to migrate VMs among servers to lower the
38 costs. However, migration of VMs introduces runtime overheads and
39 consumes extra energy, thus a good server consolidation strategy should
40 carefully plan for migration in order to both minimize energy consumption
41 and comply to the various SLAs.
43 This algorithm not only minimizes the overall number of used servers,
44 but also minimizes the number of migrations.
46 It has been developed only for tests. You must have at least 2 physical
47 compute nodes to run it, so you can easily run it on DevStack. It assumes
48 that live migration is possible on your OpenStack cluster.
49 """
51 DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage']
53 CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
55 def __init__(self, config, osc=None):
56 """Basic offline Consolidation using live migration
58 :param config: A mapping containing the configuration of this strategy
59 :type config: :py:class:`~.Struct` instance
60 :param osc: :py:class:`~.OpenStackClients` instance
61 """
62 super(BasicConsolidation, self).__init__(config, osc)
64 # set default value for the number of enabled compute nodes
65 self.number_of_enabled_nodes = 0
66 # set default value for the number of released nodes
67 self.number_of_released_nodes = 0
68 # set default value for the number of migrations
69 self.number_of_migrations = 0
71 # set default value for the efficacy
72 self.efficacy = 100
74 # TODO(jed): improve threshold overbooking?
75 self.threshold_mem = 1
76 self.threshold_disk = 1
77 self.threshold_cores = 1
79 @classmethod
80 def get_name(cls):
81 return "basic"
83 @property
84 def migration_attempts(self):
85 return self.input_parameters.get('migration_attempts', 0)
87 @property
88 def period(self):
89 return self.input_parameters.get('period', 7200)
91 @property
92 def granularity(self):
93 return self.input_parameters.get('granularity', 300)
95 @property
96 def aggregation_method(self):
97 return self.input_parameters.get(
98 'aggregation_method', {
99 "instance": 'mean',
100 "compute_node": 'mean',
101 "node": ''
102 }
103 )
105 @classmethod
106 def get_display_name(cls):
107 return _("Basic offline consolidation")
109 @classmethod
110 def get_translatable_display_name(cls):
111 return "Basic offline consolidation"
113 @classmethod
114 def get_schema(cls):
115 # Mandatory default setting for each element
116 return {
117 "properties": {
118 "migration_attempts": {
119 "description": "Maximum number of combinations to be "
120 "tried by the strategy while searching "
121 "for potential candidates. To remove the "
122 "limit, set it to 0 (by default)",
123 "type": "number",
124 "default": 0
125 },
126 "period": {
127 "description": "The time interval in seconds for "
128 "getting statistic aggregation",
129 "type": "number",
130 "default": 7200
131 },
132 "granularity": {
133 "description": "The time between two measures in an "
134 "aggregated timeseries of a metric.",
135 "type": "number",
136 "default": 300
137 },
138 "aggregation_method": {
139 "description": "Function used to aggregate multiple "
140 "measures into an aggregate. For example, "
141 "the min aggregation method will aggregate "
142 "the values of different measures to the "
143 "minimum value of all the measures in the "
144 "time range.",
145 "type": "object",
146 "properties": {
147 "instance": {
148 "type": "string",
149 "default": 'mean'
150 },
151 "compute_node": {
152 "type": "string",
153 "default": 'mean'
154 },
155 "node": {
156 "type": "string",
157 # node is deprecated
158 "default": ''
159 },
160 },
161 "default": {
162 "instance": 'mean',
163 "compute_node": 'mean',
164 # node is deprecated
165 "node": '',
166 }
167 },
168 },
169 }
171 @classmethod
172 def get_config_opts(cls):
173 return super(BasicConsolidation, cls).get_config_opts() + [
174 cfg.BoolOpt(
175 'check_optimize_metadata',
176 help='Check optimize metadata field in instance before'
177 ' migration',
178 default=False),
179 ]
181 def get_available_compute_nodes(self):
182 default_node_scope = [element.ServiceState.ENABLED.value,
183 element.ServiceState.DISABLED.value]
184 return {uuid: cn for uuid, cn in
185 self.compute_model.get_all_compute_nodes().items()
186 if cn.state == element.ServiceState.ONLINE.value and
187 cn.status in default_node_scope}
189 def check_migration(self, source_node, destination_node,
190 instance_to_migrate):
191 """Check if the migration is possible
193 :param source_node: the current node of the virtual machine
194 :param destination_node: the destination of the virtual machine
195 :param instance_to_migrate: the instance / virtual machine
196 :return: True if there is enough place otherwise false
197 """
198 if source_node == destination_node:
199 return False
201 LOG.debug('Migrate instance %s from %s to %s',
202 instance_to_migrate, source_node, destination_node)
204 used_resources = self.compute_model.get_node_used_resources(
205 destination_node)
207 # capacity requested by the compute node
208 total_cores = used_resources['vcpu'] + instance_to_migrate.vcpus
209 total_disk = used_resources['disk'] + instance_to_migrate.disk
210 total_mem = used_resources['memory'] + instance_to_migrate.memory
212 return self.check_threshold(destination_node, total_cores, total_disk,
213 total_mem)
215 def check_threshold(self, destination_node, total_cores,
216 total_disk, total_mem):
217 """Check threshold
219 Check the threshold value defined by the ratio of
220 aggregated CPU capacity of VMs on one node to CPU capacity
221 of this node must not exceed the threshold value.
223 :param destination_node: the destination of the virtual machine
224 :param total_cores: total cores of the virtual machine
225 :param total_disk: total disk size used by the virtual machine
226 :param total_mem: total memory used by the virtual machine
227 :return: True if the threshold is not exceed
228 """
229 cpu_capacity = destination_node.vcpu_capacity
230 disk_capacity = destination_node.disk_gb_capacity
231 memory_capacity = destination_node.memory_mb_capacity
233 return (cpu_capacity >= total_cores * self.threshold_cores and
234 disk_capacity >= total_disk * self.threshold_disk and
235 memory_capacity >= total_mem * self.threshold_mem)
237 def calculate_weight(self, compute_resource, total_cores_used,
238 total_disk_used, total_memory_used):
239 """Calculate weight of every resource
241 :param compute_resource:
242 :param total_cores_used:
243 :param total_disk_used:
244 :param total_memory_used:
245 :return:
246 """
247 cpu_capacity = compute_resource.vcpus
248 disk_capacity = compute_resource.disk
249 memory_capacity = compute_resource.memory
251 score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
252 float(cpu_capacity))
254 # It's possible that disk_capacity is 0, e.g., m1.nano.disk = 0
255 if disk_capacity == 0:
256 score_disk = 0
257 else:
258 score_disk = (1 - (float(disk_capacity) - float(total_disk_used)) /
259 float(disk_capacity))
261 score_memory = (
262 1 - (float(memory_capacity) - float(total_memory_used)) /
263 float(memory_capacity))
264 # TODO(jed): take in account weight
265 return (score_cores + score_disk + score_memory) / 3
267 def get_compute_node_cpu_usage(self, compute_node):
268 return self.datasource_backend.get_host_cpu_usage(
269 compute_node, self.period, self.aggregation_method['compute_node'],
270 self.granularity)
272 def get_instance_cpu_usage(self, instance):
273 return self.datasource_backend.get_instance_cpu_usage(
274 instance, self.period, self.aggregation_method['instance'],
275 self.granularity)
277 def calculate_score_node(self, node):
278 """Calculate the score that represent the utilization level
280 :param node: :py:class:`~.ComputeNode` instance
281 :return: Score for the given compute node
282 :rtype: float
283 """
284 host_avg_cpu_util = self.get_compute_node_cpu_usage(node)
286 if host_avg_cpu_util is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 resource_id = "%s_%s" % (node.uuid, node.hostname)
288 LOG.error(
289 "No values returned by %(resource_id)s "
290 "for %(metric_name)s", dict(
291 resource_id=resource_id,
292 metric_name='host_cpu_usage'))
293 host_avg_cpu_util = 100
295 total_cores_used = node.vcpus * (host_avg_cpu_util / 100.0)
297 return self.calculate_weight(node, total_cores_used, 0, 0)
299 def calculate_score_instance(self, instance):
300 """Calculate Score of virtual machine
302 :param instance: the virtual machine
303 :return: score
304 """
305 instance_cpu_utilization = self.get_instance_cpu_usage(instance)
306 if instance_cpu_utilization is None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 LOG.error(
308 "No values returned by %(resource_id)s "
309 "for %(metric_name)s", dict(
310 resource_id=instance.uuid,
311 metric_name='instance_cpu_usage'))
312 instance_cpu_utilization = 100
314 total_cores_used = instance.vcpus * (instance_cpu_utilization / 100.0)
316 return self.calculate_weight(instance, total_cores_used, 0, 0)
318 def add_action_disable_node(self, node):
319 parameters = {'state': element.ServiceState.DISABLED.value,
320 'disabled_reason': self.REASON_FOR_DISABLE,
321 'resource_name': node.hostname}
322 self.solution.add_action(action_type=self.CHANGE_NOVA_SERVICE_STATE,
323 resource_id=node.uuid,
324 input_parameters=parameters)
326 def compute_score_of_nodes(self):
327 """Calculate score of nodes based on load by VMs"""
328 score = []
329 for node in self.get_available_compute_nodes().values():
330 if node.status == element.ServiceState.ENABLED.value: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 self.number_of_enabled_nodes += 1
333 instances = self.compute_model.get_node_instances(node)
334 if len(instances) > 0:
335 result = self.calculate_score_node(node)
336 score.append((node.uuid, result))
338 return score
340 def node_and_instance_score(self, sorted_scores):
341 """Get List of VMs from node"""
342 node_to_release = sorted_scores[len(sorted_scores) - 1][0]
343 instances = self.compute_model.get_node_instances(
344 self.compute_model.get_node_by_uuid(node_to_release))
346 instances_to_migrate = self.filter_instances_by_audit_tag(instances)
347 instance_score = []
348 for instance in instances_to_migrate:
349 if instance.state == element.InstanceState.ACTIVE.value: 349 ↛ 348line 349 didn't jump to line 348 because the condition on line 349 was always true
350 instance_score.append(
351 (instance, self.calculate_score_instance(instance)))
353 return node_to_release, instance_score
355 def create_migration_instance(self, mig_instance, mig_source_node,
356 mig_destination_node):
357 """Create migration VM"""
358 if self.compute_model.migrate_instance( 358 ↛ 364line 358 didn't jump to line 364 because the condition on line 358 was always true
359 mig_instance, mig_source_node, mig_destination_node):
360 self.add_action_migrate(mig_instance, 'live',
361 mig_source_node,
362 mig_destination_node)
364 if len(self.compute_model.get_node_instances(mig_source_node)) == 0:
365 self.add_action_disable_node(mig_source_node)
366 self.number_of_released_nodes += 1
368 def calculate_num_migrations(self, sorted_instances, node_to_release,
369 sorted_score):
370 number_migrations = 0
371 for mig_instance, __ in sorted_instances:
372 # skip exclude instance when migrating
373 if mig_instance.watcher_exclude: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 LOG.debug("Instance is excluded by scope, "
375 "skipped: %s", mig_instance.uuid)
376 continue
377 for node_uuid, __ in sorted_score:
378 mig_source_node = self.compute_model.get_node_by_uuid(
379 node_to_release)
380 mig_destination_node = self.compute_model.get_node_by_uuid(
381 node_uuid)
383 result = self.check_migration(
384 mig_source_node, mig_destination_node, mig_instance)
385 if result:
386 self.create_migration_instance(
387 mig_instance, mig_source_node, mig_destination_node)
388 number_migrations += 1
389 break
390 return number_migrations
392 def unsuccessful_migration_actualization(self, number_migrations,
393 unsuccessful_migration):
394 if number_migrations > 0:
395 self.number_of_migrations += number_migrations
396 return 0
397 else:
398 return unsuccessful_migration + 1
400 def pre_execute(self):
401 self._pre_execute()
403 # backwards compatibility for node parameter.
404 if self.aggregation_method['node'] != '':
405 LOG.warning('Parameter node has been renamed to compute_node and '
406 'will be removed in next release.')
407 self.aggregation_method['compute_node'] = \
408 self.aggregation_method['node']
410 def do_execute(self, audit=None):
411 unsuccessful_migration = 0
413 scores = self.compute_score_of_nodes()
414 # Sort compute nodes by Score decreasing
415 sorted_scores = sorted(scores, reverse=True, key=lambda x: (x[1]))
416 LOG.debug("Compute node(s) BFD %s", sorted_scores)
417 # Get Node to be released
418 if len(scores) == 0:
419 LOG.warning(
420 "The workloads of the compute nodes"
421 " of the cluster is zero")
422 return
424 while sorted_scores and (
425 not self.migration_attempts or
426 self.migration_attempts >= unsuccessful_migration):
427 node_to_release, instance_score = self.node_and_instance_score(
428 sorted_scores)
430 # Sort instances by Score
431 sorted_instances = sorted(
432 instance_score, reverse=True, key=lambda x: (x[1]))
433 # BFD: Best Fit Decrease
434 LOG.debug("Instance(s) BFD %s", sorted_instances)
436 migrations = self.calculate_num_migrations(
437 sorted_instances, node_to_release, sorted_scores)
439 unsuccessful_migration = self.unsuccessful_migration_actualization(
440 migrations, unsuccessful_migration)
442 if not migrations:
443 # We don't have any possible migrations to perform on this node
444 # so we discard the node so we can try to migrate instances
445 # from the next one in the list
446 sorted_scores.pop()
448 infos = {
449 "compute_nodes_count": self.number_of_enabled_nodes,
450 "released_compute_nodes_count": self.number_of_released_nodes,
451 "instance_migrations_count": self.number_of_migrations,
452 "efficacy": self.efficacy
453 }
454 LOG.debug(infos)
456 def post_execute(self):
457 self.solution.set_efficacy_indicators(
458 compute_nodes_count=self.number_of_enabled_nodes,
459 released_compute_nodes_count=self.number_of_released_nodes,
460 instance_migrations_count=self.number_of_migrations,
461 )
462 LOG.debug(self.compute_model.to_string())