Coverage for watcher/decision_engine/strategy/strategies/workload_stabilization.py: 89%

256 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 Servionica LLC 

3# 

4# Authors: Alexander Chadin <a.chadin@servionica.ru> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19 

20import copy 

21import itertools 

22import math 

23import random 

24 

25import oslo_cache 

26from oslo_config import cfg 

27from oslo_log import log 

28import oslo_utils 

29 

30from watcher._i18n import _ 

31from watcher.common import exception 

32from watcher.decision_engine.model import element 

33from watcher.decision_engine.strategy.strategies import base 

34 

35LOG = log.getLogger(__name__) 

36CONF = cfg.CONF 

37 

38 

39def _set_memoize(conf): 

40 oslo_cache.configure(conf) 

41 region = oslo_cache.create_region() 

42 configured_region = oslo_cache.configure_cache_region(conf, region) 

43 return oslo_cache.core.get_memoization_decorator(conf, 

44 configured_region, 

45 'cache') 

46 

47 

48class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): 

49 """Workload Stabilization control using live migration 

50 

51 This is workload stabilization strategy based on standard deviation 

52 algorithm. The goal is to determine if there is an overload in a cluster 

53 and respond to it by migrating VMs to stabilize the cluster. 

54 

55 This strategy has been tested in a small (32 nodes) cluster. 

56 

57 It assumes that live migrations are possible in your cluster. 

58 """ 

59 

60 MEMOIZE = _set_memoize(CONF) 

61 

62 DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage', 

63 'instance_ram_usage', 'host_ram_usage'] 

64 

65 def __init__(self, config, osc=None): 

66 """Workload Stabilization control using live migration 

67 

68 :param config: A mapping containing the configuration of this strategy 

69 :type config: :py:class:`~.Struct` instance 

70 :param osc: :py:class:`~.OpenStackClients` instance 

71 """ 

72 super(WorkloadStabilization, self).__init__(config, osc) 

73 self.weights = None 

74 self.metrics = None 

75 self.thresholds = None 

76 self.host_choice = None 

77 self.instance_metrics = None 

78 self.retry_count = None 

79 self.periods = None 

80 self.aggregation_method = None 

81 self.sd_before_audit = 0 

82 self.sd_after_audit = 0 

83 self.instance_migrations_count = 0 

84 self.instances_count = 0 

85 

86 @classmethod 

87 def get_name(cls): 

88 return "workload_stabilization" 

89 

90 @classmethod 

91 def get_display_name(cls): 

92 return _("Workload stabilization") 

93 

94 @classmethod 

95 def get_translatable_display_name(cls): 

96 return "Workload stabilization" 

97 

98 @property 

99 def granularity(self): 

100 return self.input_parameters.get('granularity', 300) 

101 

102 @classmethod 

103 def get_schema(cls): 

104 return { 

105 "properties": { 

106 "metrics": { 

107 "description": "Metrics used as rates of cluster loads.", 

108 "type": "array", 

109 "items": { 

110 "type": "string", 

111 "enum": ["instance_cpu_usage", "instance_ram_usage"] 

112 }, 

113 "default": ["instance_cpu_usage"] 

114 }, 

115 "thresholds": { 

116 "description": "Dict where key is a metric and value " 

117 "is a trigger value.", 

118 "type": "object", 

119 "properties": { 

120 "instance_cpu_usage": { 

121 "type": "number", 

122 "minimum": 0, 

123 "maximum": 1 

124 }, 

125 "instance_ram_usage": { 

126 "type": "number", 

127 "minimum": 0, 

128 "maximum": 1 

129 } 

130 }, 

131 "default": {"instance_cpu_usage": 0.1, 

132 "instance_ram_usage": 0.1} 

133 }, 

134 "weights": { 

135 "description": "These weights used to calculate " 

136 "common standard deviation. Name of weight" 

137 " contains meter name and _weight suffix.", 

138 "type": "object", 

139 "properties": { 

140 "instance_cpu_usage_weight": { 

141 "type": "number", 

142 "minimum": 0, 

143 "maximum": 1 

144 }, 

145 "instance_ram_usage_weight": { 

146 "type": "number", 

147 "minimum": 0, 

148 "maximum": 1 

149 } 

150 }, 

151 "default": {"instance_cpu_usage_weight": 1.0, 

152 "instance_ram_usage_weight": 1.0} 

153 }, 

154 "instance_metrics": { 

155 "description": "Mapping to get hardware statistics using" 

156 " instance metrics", 

157 "type": "object", 

158 "default": {"instance_cpu_usage": "host_cpu_usage", 

159 "instance_ram_usage": "host_ram_usage"} 

160 }, 

161 "host_choice": { 

162 "description": "Method of host's choice. There are cycle," 

163 " retry and fullsearch methods. " 

164 "Cycle will iterate hosts in cycle. " 

165 "Retry will get some hosts random " 

166 "(count defined in retry_count option). " 

167 "Fullsearch will return each host " 

168 "from list.", 

169 "type": "string", 

170 "default": "retry" 

171 }, 

172 "retry_count": { 

173 "description": "Count of random returned hosts", 

174 "type": "number", 

175 "minimum": 1, 

176 "default": 1 

177 }, 

178 "periods": { 

179 "description": "These periods are used to get statistic " 

180 "aggregation for instance and host " 

181 "metrics. The period is simply a repeating" 

182 " interval of time into which the samples" 

183 " are grouped for aggregation. Watcher " 

184 "uses only the last period of all received" 

185 " ones.", 

186 "type": "object", 

187 "properties": { 

188 "instance": { 

189 "type": "integer", 

190 "minimum": 0 

191 }, 

192 "compute_node": { 

193 "type": "integer", 

194 "minimum": 0 

195 }, 

196 "node": { 

197 "type": "integer", 

198 # node is deprecated 

199 "minimum": 0, 

200 "default": 0 

201 }, 

202 }, 

203 "default": { 

204 "instance": 720, 

205 "compute_node": 600, 

206 # node is deprecated 

207 "node": 0, 

208 } 

209 }, 

210 "aggregation_method": { 

211 "description": "Function used to aggregate multiple " 

212 "measures into an aggregate. For example, " 

213 "the min aggregation method will aggregate " 

214 "the values of different measures to the " 

215 "minimum value of all the measures in the " 

216 "time range.", 

217 "type": "object", 

218 "properties": { 

219 "instance": { 

220 "type": "string", 

221 "default": 'mean' 

222 }, 

223 "compute_node": { 

224 "type": "string", 

225 "default": 'mean' 

226 }, 

227 # node is deprecated 

228 "node": { 

229 "type": "string", 

230 "default": '' 

231 }, 

232 }, 

233 "default": { 

234 "instance": 'mean', 

235 "compute_node": 'mean', 

236 # node is deprecated 

237 "node": '', 

238 } 

239 }, 

240 "granularity": { 

241 "description": "The time between two measures in an " 

242 "aggregated timeseries of a metric.", 

243 "type": "number", 

244 "minimum": 0, 

245 "default": 300 

246 }, 

247 } 

248 } 

249 

250 def transform_instance_cpu(self, instance_load, host_vcpus): 

251 """Transform instance cpu utilization to overall host cpu utilization. 

252 

253 :param instance_load: dict that contains instance uuid and 

254 utilization info. 

255 :param host_vcpus: int 

256 :return: float value 

257 """ 

258 return (instance_load['instance_cpu_usage'] * 

259 (instance_load['vcpus'] / float(host_vcpus))) 

260 

261 @MEMOIZE 

262 def get_instance_load(self, instance): 

263 """Gathering instance load through ceilometer/gnocchi statistic. 

264 

265 :param instance: instance for which statistic is gathered. 

266 :return: dict 

267 """ 

268 LOG.debug('Getting load for %s', instance.uuid) 

269 instance_load = {'uuid': instance.uuid, 'vcpus': instance.vcpus} 

270 for meter in self.metrics: 

271 avg_meter = self.datasource_backend.statistic_aggregation( 

272 instance, 'instance', meter, self.periods['instance'], 

273 self.aggregation_method['instance'], self.granularity) 

274 if avg_meter is None: 

275 LOG.warning( 

276 "No values returned by %(resource_id)s " 

277 "for %(metric_name)s", dict( 

278 resource_id=instance.uuid, metric_name=meter)) 

279 return 

280 if meter == 'instance_cpu_usage': 

281 avg_meter /= float(100) 

282 LOG.debug('Load of %(metric)s for %(instance)s is %(value)s', 

283 {'metric': meter, 

284 'instance': instance.uuid, 

285 'value': avg_meter}) 

286 instance_load[meter] = avg_meter 

287 return instance_load 

288 

289 def normalize_hosts_load(self, hosts): 

290 normalized_hosts = copy.deepcopy(hosts) 

291 for host in normalized_hosts: 

292 if 'instance_ram_usage' in normalized_hosts[host]: 292 ↛ 291line 292 didn't jump to line 291 because the condition on line 292 was always true

293 node = self.compute_model.get_node_by_uuid(host) 

294 normalized_hosts[host]['instance_ram_usage'] \ 

295 /= float(node.memory) 

296 

297 return normalized_hosts 

298 

299 def get_available_nodes(self): 

300 nodes = self.compute_model.get_all_compute_nodes().items() 

301 return {node_uuid: node for node_uuid, node in nodes 

302 if node.state == element.ServiceState.ONLINE.value and 

303 node.status == element.ServiceState.ENABLED.value} 

304 

305 def get_hosts_load(self): 

306 """Get load of every available host by gathering instances load""" 

307 hosts_load = {} 

308 for node_id, node in self.get_available_nodes().items(): 

309 hosts_load[node_id] = {} 

310 hosts_load[node_id]['vcpus'] = node.vcpus 

311 LOG.debug('Getting load for %s', node_id) 

312 for metric in self.metrics: 

313 avg_meter = None 

314 meter_name = self.instance_metrics[metric] 

315 avg_meter = self.datasource_backend.statistic_aggregation( 

316 node, 'compute_node', self.instance_metrics[metric], 

317 self.periods['compute_node'], 

318 self.aggregation_method['compute_node'], self.granularity) 

319 if avg_meter is None: 

320 LOG.warning('No values returned by node %s for %s', 

321 node_id, meter_name) 

322 del hosts_load[node_id] 

323 break 

324 else: 

325 if meter_name == 'host_ram_usage': 

326 avg_meter /= oslo_utils.units.Ki 

327 if meter_name == 'host_cpu_usage': 

328 avg_meter /= 100 

329 LOG.debug('Load of %(metric)s for %(node)s is %(value)s', 

330 {'metric': metric, 

331 'node': node_id, 

332 'value': avg_meter}) 

333 hosts_load[node_id][metric] = avg_meter 

334 return hosts_load 

335 

336 def get_sd(self, hosts, meter_name): 

337 """Get standard deviation among hosts by specified meter""" 

338 mean = 0 

339 variation = 0 

340 num_hosts = len(hosts) 

341 if num_hosts == 0: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 return 0 

343 for host_id in hosts: 

344 mean += hosts[host_id][meter_name] 

345 mean /= num_hosts 

346 for host_id in hosts: 

347 variation += (hosts[host_id][meter_name] - mean) ** 2 

348 variation /= num_hosts 

349 sd = math.sqrt(variation) 

350 return sd 

351 

352 def calculate_weighted_sd(self, sd_case): 

353 """Calculate common standard deviation among meters on host""" 

354 weighted_sd = 0 

355 for metric, value in zip(self.metrics, sd_case): 

356 try: 

357 weighted_sd += value * float(self.weights[metric + '_weight']) 

358 except KeyError as exc: 

359 LOG.exception(exc) 

360 raise exception.WatcherException( 

361 _("Incorrect mapping: could not find associated weight" 

362 " for %s in weight dict.") % metric) 

363 return weighted_sd 

364 

365 def calculate_migration_case(self, hosts, instance, src_node, dst_node): 

366 """Calculate migration case 

367 

368 Return list of standard deviation values, that appearing in case of 

369 migration of instance from source host to destination host 

370 :param hosts: hosts with their workload 

371 :param instance: the virtual machine 

372 :param src_node: the source node 

373 :param dst_node: the destination node 

374 :return: list of standard deviation values 

375 """ 

376 migration_case = [] 

377 new_hosts = copy.deepcopy(hosts) 

378 instance_load = self.get_instance_load(instance) 

379 if not instance_load: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 return 

381 s_host_vcpus = new_hosts[src_node.uuid]['vcpus'] 

382 d_host_vcpus = new_hosts[dst_node.uuid]['vcpus'] 

383 for metric in self.metrics: 

384 if metric == 'instance_cpu_usage': 

385 new_hosts[src_node.uuid][metric] -= ( 

386 self.transform_instance_cpu(instance_load, s_host_vcpus)) 

387 new_hosts[dst_node.uuid][metric] += ( 

388 self.transform_instance_cpu(instance_load, d_host_vcpus)) 

389 else: 

390 new_hosts[src_node.uuid][metric] -= instance_load[metric] 

391 new_hosts[dst_node.uuid][metric] += instance_load[metric] 

392 normalized_hosts = self.normalize_hosts_load(new_hosts) 

393 for metric in self.metrics: 

394 migration_case.append(self.get_sd(normalized_hosts, metric)) 

395 migration_case.append(new_hosts) 

396 return migration_case 

397 

398 def get_current_weighted_sd(self, hosts_load): 

399 """Calculate current weighted sd""" 

400 current_sd = [] 

401 normalized_load = self.normalize_hosts_load(hosts_load) 

402 for metric in self.metrics: 

403 metric_sd = self.get_sd(normalized_load, metric) 

404 current_sd.append(metric_sd) 

405 current_sd.append(hosts_load) 

406 return self.calculate_weighted_sd(current_sd[:-1]) 

407 

408 def simulate_migrations(self, hosts): 

409 """Make sorted list of pairs instance:dst_host""" 

410 def yield_nodes(nodes): 

411 if self.host_choice == 'cycle': 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true

412 for i in itertools.cycle(nodes): 

413 yield [i] 

414 if self.host_choice == 'retry': 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true

415 while True: 

416 yield random.sample(nodes, self.retry_count) 

417 if self.host_choice == 'fullsearch': 417 ↛ exitline 417 didn't return from function 'yield_nodes' because the condition on line 417 was always true

418 while True: 

419 yield nodes 

420 

421 instance_host_map = [] 

422 nodes = sorted(list(self.get_available_nodes())) 

423 current_weighted_sd = self.get_current_weighted_sd(hosts) 

424 for src_host in nodes: 

425 src_node = self.compute_model.get_node_by_uuid(src_host) 

426 c_nodes = copy.copy(nodes) 

427 c_nodes.remove(src_host) 

428 node_list = yield_nodes(c_nodes) 

429 for instance in self.compute_model.get_node_instances(src_node): 

430 # NOTE: skip exclude instance when migrating 

431 if instance.watcher_exclude: 

432 LOG.debug("Instance is excluded by scope, " 

433 "skipped: %s", instance.uuid) 

434 continue 

435 if instance.state not in [element.InstanceState.ACTIVE.value, 435 ↛ 437line 435 didn't jump to line 437 because the condition on line 435 was never true

436 element.InstanceState.PAUSED.value]: 

437 continue 

438 min_sd_case = {'value': current_weighted_sd} 

439 for dst_host in next(node_list): 

440 dst_node = self.compute_model.get_node_by_uuid(dst_host) 

441 sd_case = self.calculate_migration_case( 

442 hosts, instance, src_node, dst_node) 

443 if sd_case is None: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 break 

445 

446 weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) 

447 

448 if weighted_sd < min_sd_case['value']: 

449 min_sd_case = { 

450 'host': dst_node.uuid, 'value': weighted_sd, 

451 's_host': src_node.uuid, 'instance': instance.uuid} 

452 instance_host_map.append(min_sd_case) 

453 if sd_case is None: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true

454 continue 

455 return sorted(instance_host_map, key=lambda x: x['value']) 

456 

457 def check_threshold(self): 

458 """Check if cluster is needed in balancing""" 

459 hosts_load = self.get_hosts_load() 

460 normalized_load = self.normalize_hosts_load(hosts_load) 

461 for metric in self.metrics: 461 ↛ exitline 461 didn't return from function 'check_threshold' because the loop on line 461 didn't complete

462 metric_sd = self.get_sd(normalized_load, metric) 

463 LOG.info("Standard deviation for %(metric)s is %(sd)s.", 

464 {'metric': metric, 'sd': metric_sd}) 

465 if metric_sd > float(self.thresholds[metric]): 465 ↛ 461line 465 didn't jump to line 461 because the condition on line 465 was always true

466 LOG.info("Standard deviation of %(metric)s exceeds" 

467 " appropriate threshold %(threshold)s by %(sd)s.", 

468 {'metric': metric, 

469 'threshold': float(self.thresholds[metric]), 

470 'sd': metric_sd}) 

471 LOG.info("Launching workload optimization...") 

472 self.sd_before_audit = metric_sd 

473 return self.simulate_migrations(hosts_load) 

474 

475 def create_migration_instance(self, mig_instance, mig_source_node, 

476 mig_destination_node): 

477 """Create migration VM""" 

478 if self.compute_model.migrate_instance( 

479 mig_instance, mig_source_node, mig_destination_node): 

480 self.add_action_migrate(mig_instance, 'live', 

481 mig_source_node, 

482 mig_destination_node) 

483 self.instance_migrations_count += 1 

484 

485 def migrate(self, instance_uuid, src_host, dst_host): 

486 mig_instance = self.compute_model.get_instance_by_uuid(instance_uuid) 

487 mig_source_node = self.compute_model.get_node_by_uuid( 

488 src_host) 

489 mig_destination_node = self.compute_model.get_node_by_uuid( 

490 dst_host) 

491 self.create_migration_instance(mig_instance, mig_source_node, 

492 mig_destination_node) 

493 

494 def fill_solution(self): 

495 self.solution.model = self.compute_model 

496 return self.solution 

497 

498 def pre_execute(self): 

499 self._pre_execute() 

500 self.weights = self.input_parameters.weights 

501 self.metrics = self.input_parameters.metrics 

502 self.thresholds = self.input_parameters.thresholds 

503 self.host_choice = self.input_parameters.host_choice 

504 self.instance_metrics = self.input_parameters.instance_metrics 

505 self.retry_count = self.input_parameters.retry_count 

506 self.periods = self.input_parameters.periods 

507 self.aggregation_method = self.input_parameters.aggregation_method 

508 

509 # backwards compatibility for node parameter with aggregate. 

510 if self.aggregation_method['node']: 

511 LOG.warning('Parameter node has been renamed to compute_node and ' 

512 'will be removed in next release.') 

513 self.aggregation_method['compute_node'] = \ 

514 self.aggregation_method['node'] 

515 

516 # backwards compatibility for node parameter with period. 

517 if self.periods['node'] != 0: 

518 LOG.warning('Parameter node has been renamed to compute_node and ' 

519 'will be removed in next release.') 

520 self.periods['compute_node'] = self.periods['node'] 

521 

522 def do_execute(self, audit=None): 

523 migration = self.check_threshold() 

524 if migration: 

525 hosts_load = self.get_hosts_load() 

526 min_sd = 1 

527 balanced = False 

528 for instance_host in migration: 

529 instance = self.compute_model.get_instance_by_uuid( 

530 instance_host['instance']) 

531 src_node = self.compute_model.get_node_by_uuid( 

532 instance_host['s_host']) 

533 dst_node = self.compute_model.get_node_by_uuid( 

534 instance_host['host']) 

535 if instance.disk > dst_node.disk: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 continue 

537 instance_load = self.calculate_migration_case( 

538 hosts_load, instance, src_node, dst_node) 

539 weighted_sd = self.calculate_weighted_sd(instance_load[:-1]) 

540 if weighted_sd < min_sd: 540 ↛ 555line 540 didn't jump to line 555 because the condition on line 540 was always true

541 min_sd = weighted_sd 

542 hosts_load = instance_load[-1] 

543 LOG.info("Migration of %(instance_uuid)s from %(s_host)s " 

544 "to %(host)s reduces standard deviation to " 

545 "%(min_sd)s.", 

546 {'instance_uuid': instance_host['instance'], 

547 's_host': instance_host['s_host'], 

548 'host': instance_host['host'], 

549 'min_sd': min_sd}) 

550 self.migrate(instance_host['instance'], 

551 instance_host['s_host'], 

552 instance_host['host']) 

553 self.sd_after_audit = min_sd 

554 

555 for metric, value in zip(self.metrics, instance_load[:-1]): 

556 if value < float(self.thresholds[metric]): 

557 LOG.info("At least one of metrics' values fell " 

558 "below the threshold values. " 

559 "Workload Stabilization has successfully " 

560 "completed optimization process.") 

561 balanced = True 

562 break 

563 if balanced: 

564 break 

565 

566 def post_execute(self): 

567 """Post-execution phase 

568 

569 This can be used to compute the global efficacy 

570 """ 

571 self.fill_solution() 

572 

573 self.solution.set_efficacy_indicators( 

574 instance_migrations_count=self.instance_migrations_count, 

575 standard_deviation_before_audit=self.sd_before_audit, 

576 standard_deviation_after_audit=self.sd_after_audit, 

577 instances_count=len(self.compute_model.get_all_instances()), 

578 ) 

579 

580 LOG.debug(self.compute_model.to_string())