Coverage for watcher/decision_engine/strategy/strategies/workload_stabilization.py: 89%
256 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 Servionica LLC
3#
4# Authors: Alexander Chadin <a.chadin@servionica.ru>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
20import copy
21import itertools
22import math
23import random
25import oslo_cache
26from oslo_config import cfg
27from oslo_log import log
28import oslo_utils
30from watcher._i18n import _
31from watcher.common import exception
32from watcher.decision_engine.model import element
33from watcher.decision_engine.strategy.strategies import base
35LOG = log.getLogger(__name__)
36CONF = cfg.CONF
39def _set_memoize(conf):
40 oslo_cache.configure(conf)
41 region = oslo_cache.create_region()
42 configured_region = oslo_cache.configure_cache_region(conf, region)
43 return oslo_cache.core.get_memoization_decorator(conf,
44 configured_region,
45 'cache')
48class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
49 """Workload Stabilization control using live migration
51 This is workload stabilization strategy based on standard deviation
52 algorithm. The goal is to determine if there is an overload in a cluster
53 and respond to it by migrating VMs to stabilize the cluster.
55 This strategy has been tested in a small (32 nodes) cluster.
57 It assumes that live migrations are possible in your cluster.
58 """
60 MEMOIZE = _set_memoize(CONF)
62 DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage',
63 'instance_ram_usage', 'host_ram_usage']
65 def __init__(self, config, osc=None):
66 """Workload Stabilization control using live migration
68 :param config: A mapping containing the configuration of this strategy
69 :type config: :py:class:`~.Struct` instance
70 :param osc: :py:class:`~.OpenStackClients` instance
71 """
72 super(WorkloadStabilization, self).__init__(config, osc)
73 self.weights = None
74 self.metrics = None
75 self.thresholds = None
76 self.host_choice = None
77 self.instance_metrics = None
78 self.retry_count = None
79 self.periods = None
80 self.aggregation_method = None
81 self.sd_before_audit = 0
82 self.sd_after_audit = 0
83 self.instance_migrations_count = 0
84 self.instances_count = 0
86 @classmethod
87 def get_name(cls):
88 return "workload_stabilization"
90 @classmethod
91 def get_display_name(cls):
92 return _("Workload stabilization")
94 @classmethod
95 def get_translatable_display_name(cls):
96 return "Workload stabilization"
98 @property
99 def granularity(self):
100 return self.input_parameters.get('granularity', 300)
102 @classmethod
103 def get_schema(cls):
104 return {
105 "properties": {
106 "metrics": {
107 "description": "Metrics used as rates of cluster loads.",
108 "type": "array",
109 "items": {
110 "type": "string",
111 "enum": ["instance_cpu_usage", "instance_ram_usage"]
112 },
113 "default": ["instance_cpu_usage"]
114 },
115 "thresholds": {
116 "description": "Dict where key is a metric and value "
117 "is a trigger value.",
118 "type": "object",
119 "properties": {
120 "instance_cpu_usage": {
121 "type": "number",
122 "minimum": 0,
123 "maximum": 1
124 },
125 "instance_ram_usage": {
126 "type": "number",
127 "minimum": 0,
128 "maximum": 1
129 }
130 },
131 "default": {"instance_cpu_usage": 0.1,
132 "instance_ram_usage": 0.1}
133 },
134 "weights": {
135 "description": "These weights used to calculate "
136 "common standard deviation. Name of weight"
137 " contains meter name and _weight suffix.",
138 "type": "object",
139 "properties": {
140 "instance_cpu_usage_weight": {
141 "type": "number",
142 "minimum": 0,
143 "maximum": 1
144 },
145 "instance_ram_usage_weight": {
146 "type": "number",
147 "minimum": 0,
148 "maximum": 1
149 }
150 },
151 "default": {"instance_cpu_usage_weight": 1.0,
152 "instance_ram_usage_weight": 1.0}
153 },
154 "instance_metrics": {
155 "description": "Mapping to get hardware statistics using"
156 " instance metrics",
157 "type": "object",
158 "default": {"instance_cpu_usage": "host_cpu_usage",
159 "instance_ram_usage": "host_ram_usage"}
160 },
161 "host_choice": {
162 "description": "Method of host's choice. There are cycle,"
163 " retry and fullsearch methods. "
164 "Cycle will iterate hosts in cycle. "
165 "Retry will get some hosts random "
166 "(count defined in retry_count option). "
167 "Fullsearch will return each host "
168 "from list.",
169 "type": "string",
170 "default": "retry"
171 },
172 "retry_count": {
173 "description": "Count of random returned hosts",
174 "type": "number",
175 "minimum": 1,
176 "default": 1
177 },
178 "periods": {
179 "description": "These periods are used to get statistic "
180 "aggregation for instance and host "
181 "metrics. The period is simply a repeating"
182 " interval of time into which the samples"
183 " are grouped for aggregation. Watcher "
184 "uses only the last period of all received"
185 " ones.",
186 "type": "object",
187 "properties": {
188 "instance": {
189 "type": "integer",
190 "minimum": 0
191 },
192 "compute_node": {
193 "type": "integer",
194 "minimum": 0
195 },
196 "node": {
197 "type": "integer",
198 # node is deprecated
199 "minimum": 0,
200 "default": 0
201 },
202 },
203 "default": {
204 "instance": 720,
205 "compute_node": 600,
206 # node is deprecated
207 "node": 0,
208 }
209 },
210 "aggregation_method": {
211 "description": "Function used to aggregate multiple "
212 "measures into an aggregate. For example, "
213 "the min aggregation method will aggregate "
214 "the values of different measures to the "
215 "minimum value of all the measures in the "
216 "time range.",
217 "type": "object",
218 "properties": {
219 "instance": {
220 "type": "string",
221 "default": 'mean'
222 },
223 "compute_node": {
224 "type": "string",
225 "default": 'mean'
226 },
227 # node is deprecated
228 "node": {
229 "type": "string",
230 "default": ''
231 },
232 },
233 "default": {
234 "instance": 'mean',
235 "compute_node": 'mean',
236 # node is deprecated
237 "node": '',
238 }
239 },
240 "granularity": {
241 "description": "The time between two measures in an "
242 "aggregated timeseries of a metric.",
243 "type": "number",
244 "minimum": 0,
245 "default": 300
246 },
247 }
248 }
250 def transform_instance_cpu(self, instance_load, host_vcpus):
251 """Transform instance cpu utilization to overall host cpu utilization.
253 :param instance_load: dict that contains instance uuid and
254 utilization info.
255 :param host_vcpus: int
256 :return: float value
257 """
258 return (instance_load['instance_cpu_usage'] *
259 (instance_load['vcpus'] / float(host_vcpus)))
261 @MEMOIZE
262 def get_instance_load(self, instance):
263 """Gathering instance load through ceilometer/gnocchi statistic.
265 :param instance: instance for which statistic is gathered.
266 :return: dict
267 """
268 LOG.debug('Getting load for %s', instance.uuid)
269 instance_load = {'uuid': instance.uuid, 'vcpus': instance.vcpus}
270 for meter in self.metrics:
271 avg_meter = self.datasource_backend.statistic_aggregation(
272 instance, 'instance', meter, self.periods['instance'],
273 self.aggregation_method['instance'], self.granularity)
274 if avg_meter is None:
275 LOG.warning(
276 "No values returned by %(resource_id)s "
277 "for %(metric_name)s", dict(
278 resource_id=instance.uuid, metric_name=meter))
279 return
280 if meter == 'instance_cpu_usage':
281 avg_meter /= float(100)
282 LOG.debug('Load of %(metric)s for %(instance)s is %(value)s',
283 {'metric': meter,
284 'instance': instance.uuid,
285 'value': avg_meter})
286 instance_load[meter] = avg_meter
287 return instance_load
289 def normalize_hosts_load(self, hosts):
290 normalized_hosts = copy.deepcopy(hosts)
291 for host in normalized_hosts:
292 if 'instance_ram_usage' in normalized_hosts[host]: 292 ↛ 291line 292 didn't jump to line 291 because the condition on line 292 was always true
293 node = self.compute_model.get_node_by_uuid(host)
294 normalized_hosts[host]['instance_ram_usage'] \
295 /= float(node.memory)
297 return normalized_hosts
299 def get_available_nodes(self):
300 nodes = self.compute_model.get_all_compute_nodes().items()
301 return {node_uuid: node for node_uuid, node in nodes
302 if node.state == element.ServiceState.ONLINE.value and
303 node.status == element.ServiceState.ENABLED.value}
305 def get_hosts_load(self):
306 """Get load of every available host by gathering instances load"""
307 hosts_load = {}
308 for node_id, node in self.get_available_nodes().items():
309 hosts_load[node_id] = {}
310 hosts_load[node_id]['vcpus'] = node.vcpus
311 LOG.debug('Getting load for %s', node_id)
312 for metric in self.metrics:
313 avg_meter = None
314 meter_name = self.instance_metrics[metric]
315 avg_meter = self.datasource_backend.statistic_aggregation(
316 node, 'compute_node', self.instance_metrics[metric],
317 self.periods['compute_node'],
318 self.aggregation_method['compute_node'], self.granularity)
319 if avg_meter is None:
320 LOG.warning('No values returned by node %s for %s',
321 node_id, meter_name)
322 del hosts_load[node_id]
323 break
324 else:
325 if meter_name == 'host_ram_usage':
326 avg_meter /= oslo_utils.units.Ki
327 if meter_name == 'host_cpu_usage':
328 avg_meter /= 100
329 LOG.debug('Load of %(metric)s for %(node)s is %(value)s',
330 {'metric': metric,
331 'node': node_id,
332 'value': avg_meter})
333 hosts_load[node_id][metric] = avg_meter
334 return hosts_load
336 def get_sd(self, hosts, meter_name):
337 """Get standard deviation among hosts by specified meter"""
338 mean = 0
339 variation = 0
340 num_hosts = len(hosts)
341 if num_hosts == 0: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 return 0
343 for host_id in hosts:
344 mean += hosts[host_id][meter_name]
345 mean /= num_hosts
346 for host_id in hosts:
347 variation += (hosts[host_id][meter_name] - mean) ** 2
348 variation /= num_hosts
349 sd = math.sqrt(variation)
350 return sd
352 def calculate_weighted_sd(self, sd_case):
353 """Calculate common standard deviation among meters on host"""
354 weighted_sd = 0
355 for metric, value in zip(self.metrics, sd_case):
356 try:
357 weighted_sd += value * float(self.weights[metric + '_weight'])
358 except KeyError as exc:
359 LOG.exception(exc)
360 raise exception.WatcherException(
361 _("Incorrect mapping: could not find associated weight"
362 " for %s in weight dict.") % metric)
363 return weighted_sd
365 def calculate_migration_case(self, hosts, instance, src_node, dst_node):
366 """Calculate migration case
368 Return list of standard deviation values, that appearing in case of
369 migration of instance from source host to destination host
370 :param hosts: hosts with their workload
371 :param instance: the virtual machine
372 :param src_node: the source node
373 :param dst_node: the destination node
374 :return: list of standard deviation values
375 """
376 migration_case = []
377 new_hosts = copy.deepcopy(hosts)
378 instance_load = self.get_instance_load(instance)
379 if not instance_load: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 return
381 s_host_vcpus = new_hosts[src_node.uuid]['vcpus']
382 d_host_vcpus = new_hosts[dst_node.uuid]['vcpus']
383 for metric in self.metrics:
384 if metric == 'instance_cpu_usage':
385 new_hosts[src_node.uuid][metric] -= (
386 self.transform_instance_cpu(instance_load, s_host_vcpus))
387 new_hosts[dst_node.uuid][metric] += (
388 self.transform_instance_cpu(instance_load, d_host_vcpus))
389 else:
390 new_hosts[src_node.uuid][metric] -= instance_load[metric]
391 new_hosts[dst_node.uuid][metric] += instance_load[metric]
392 normalized_hosts = self.normalize_hosts_load(new_hosts)
393 for metric in self.metrics:
394 migration_case.append(self.get_sd(normalized_hosts, metric))
395 migration_case.append(new_hosts)
396 return migration_case
398 def get_current_weighted_sd(self, hosts_load):
399 """Calculate current weighted sd"""
400 current_sd = []
401 normalized_load = self.normalize_hosts_load(hosts_load)
402 for metric in self.metrics:
403 metric_sd = self.get_sd(normalized_load, metric)
404 current_sd.append(metric_sd)
405 current_sd.append(hosts_load)
406 return self.calculate_weighted_sd(current_sd[:-1])
408 def simulate_migrations(self, hosts):
409 """Make sorted list of pairs instance:dst_host"""
410 def yield_nodes(nodes):
411 if self.host_choice == 'cycle': 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true
412 for i in itertools.cycle(nodes):
413 yield [i]
414 if self.host_choice == 'retry': 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true
415 while True:
416 yield random.sample(nodes, self.retry_count)
417 if self.host_choice == 'fullsearch': 417 ↛ exitline 417 didn't return from function 'yield_nodes' because the condition on line 417 was always true
418 while True:
419 yield nodes
421 instance_host_map = []
422 nodes = sorted(list(self.get_available_nodes()))
423 current_weighted_sd = self.get_current_weighted_sd(hosts)
424 for src_host in nodes:
425 src_node = self.compute_model.get_node_by_uuid(src_host)
426 c_nodes = copy.copy(nodes)
427 c_nodes.remove(src_host)
428 node_list = yield_nodes(c_nodes)
429 for instance in self.compute_model.get_node_instances(src_node):
430 # NOTE: skip exclude instance when migrating
431 if instance.watcher_exclude:
432 LOG.debug("Instance is excluded by scope, "
433 "skipped: %s", instance.uuid)
434 continue
435 if instance.state not in [element.InstanceState.ACTIVE.value, 435 ↛ 437line 435 didn't jump to line 437 because the condition on line 435 was never true
436 element.InstanceState.PAUSED.value]:
437 continue
438 min_sd_case = {'value': current_weighted_sd}
439 for dst_host in next(node_list):
440 dst_node = self.compute_model.get_node_by_uuid(dst_host)
441 sd_case = self.calculate_migration_case(
442 hosts, instance, src_node, dst_node)
443 if sd_case is None: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 break
446 weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
448 if weighted_sd < min_sd_case['value']:
449 min_sd_case = {
450 'host': dst_node.uuid, 'value': weighted_sd,
451 's_host': src_node.uuid, 'instance': instance.uuid}
452 instance_host_map.append(min_sd_case)
453 if sd_case is None: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true
454 continue
455 return sorted(instance_host_map, key=lambda x: x['value'])
457 def check_threshold(self):
458 """Check if cluster is needed in balancing"""
459 hosts_load = self.get_hosts_load()
460 normalized_load = self.normalize_hosts_load(hosts_load)
461 for metric in self.metrics: 461 ↛ exitline 461 didn't return from function 'check_threshold' because the loop on line 461 didn't complete
462 metric_sd = self.get_sd(normalized_load, metric)
463 LOG.info("Standard deviation for %(metric)s is %(sd)s.",
464 {'metric': metric, 'sd': metric_sd})
465 if metric_sd > float(self.thresholds[metric]): 465 ↛ 461line 465 didn't jump to line 461 because the condition on line 465 was always true
466 LOG.info("Standard deviation of %(metric)s exceeds"
467 " appropriate threshold %(threshold)s by %(sd)s.",
468 {'metric': metric,
469 'threshold': float(self.thresholds[metric]),
470 'sd': metric_sd})
471 LOG.info("Launching workload optimization...")
472 self.sd_before_audit = metric_sd
473 return self.simulate_migrations(hosts_load)
475 def create_migration_instance(self, mig_instance, mig_source_node,
476 mig_destination_node):
477 """Create migration VM"""
478 if self.compute_model.migrate_instance(
479 mig_instance, mig_source_node, mig_destination_node):
480 self.add_action_migrate(mig_instance, 'live',
481 mig_source_node,
482 mig_destination_node)
483 self.instance_migrations_count += 1
485 def migrate(self, instance_uuid, src_host, dst_host):
486 mig_instance = self.compute_model.get_instance_by_uuid(instance_uuid)
487 mig_source_node = self.compute_model.get_node_by_uuid(
488 src_host)
489 mig_destination_node = self.compute_model.get_node_by_uuid(
490 dst_host)
491 self.create_migration_instance(mig_instance, mig_source_node,
492 mig_destination_node)
494 def fill_solution(self):
495 self.solution.model = self.compute_model
496 return self.solution
498 def pre_execute(self):
499 self._pre_execute()
500 self.weights = self.input_parameters.weights
501 self.metrics = self.input_parameters.metrics
502 self.thresholds = self.input_parameters.thresholds
503 self.host_choice = self.input_parameters.host_choice
504 self.instance_metrics = self.input_parameters.instance_metrics
505 self.retry_count = self.input_parameters.retry_count
506 self.periods = self.input_parameters.periods
507 self.aggregation_method = self.input_parameters.aggregation_method
509 # backwards compatibility for node parameter with aggregate.
510 if self.aggregation_method['node']:
511 LOG.warning('Parameter node has been renamed to compute_node and '
512 'will be removed in next release.')
513 self.aggregation_method['compute_node'] = \
514 self.aggregation_method['node']
516 # backwards compatibility for node parameter with period.
517 if self.periods['node'] != 0:
518 LOG.warning('Parameter node has been renamed to compute_node and '
519 'will be removed in next release.')
520 self.periods['compute_node'] = self.periods['node']
522 def do_execute(self, audit=None):
523 migration = self.check_threshold()
524 if migration:
525 hosts_load = self.get_hosts_load()
526 min_sd = 1
527 balanced = False
528 for instance_host in migration:
529 instance = self.compute_model.get_instance_by_uuid(
530 instance_host['instance'])
531 src_node = self.compute_model.get_node_by_uuid(
532 instance_host['s_host'])
533 dst_node = self.compute_model.get_node_by_uuid(
534 instance_host['host'])
535 if instance.disk > dst_node.disk: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true
536 continue
537 instance_load = self.calculate_migration_case(
538 hosts_load, instance, src_node, dst_node)
539 weighted_sd = self.calculate_weighted_sd(instance_load[:-1])
540 if weighted_sd < min_sd: 540 ↛ 555line 540 didn't jump to line 555 because the condition on line 540 was always true
541 min_sd = weighted_sd
542 hosts_load = instance_load[-1]
543 LOG.info("Migration of %(instance_uuid)s from %(s_host)s "
544 "to %(host)s reduces standard deviation to "
545 "%(min_sd)s.",
546 {'instance_uuid': instance_host['instance'],
547 's_host': instance_host['s_host'],
548 'host': instance_host['host'],
549 'min_sd': min_sd})
550 self.migrate(instance_host['instance'],
551 instance_host['s_host'],
552 instance_host['host'])
553 self.sd_after_audit = min_sd
555 for metric, value in zip(self.metrics, instance_load[:-1]):
556 if value < float(self.thresholds[metric]):
557 LOG.info("At least one of metrics' values fell "
558 "below the threshold values. "
559 "Workload Stabilization has successfully "
560 "completed optimization process.")
561 balanced = True
562 break
563 if balanced:
564 break
566 def post_execute(self):
567 """Post-execution phase
569 This can be used to compute the global efficacy
570 """
571 self.fill_solution()
573 self.solution.set_efficacy_indicators(
574 instance_migrations_count=self.instance_migrations_count,
575 standard_deviation_before_audit=self.sd_before_audit,
576 standard_deviation_after_audit=self.sd_after_audit,
577 instances_count=len(self.compute_model.get_all_instances()),
578 )
580 LOG.debug(self.compute_model.to_string())