Coverage for watcher/decision_engine/strategy/strategies/basic_consolidation.py: 93%

165 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# 

4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19 

20from oslo_config import cfg 

21from oslo_log import log 

22 

23from watcher._i18n import _ 

24from watcher.decision_engine.model import element 

25from watcher.decision_engine.strategy.strategies import base 

26 

27LOG = log.getLogger(__name__) 

28 

29 

30class BasicConsolidation(base.ServerConsolidationBaseStrategy): 

31 """Good server consolidation strategy 

32 

33 Basic offline consolidation using live migration 

34 

35 Consolidation of VMs is essential to achieve energy optimization in cloud 

36 environments such as OpenStack. As VMs are spinned up and/or moved over 

37 time, it becomes necessary to migrate VMs among servers to lower the 

38 costs. However, migration of VMs introduces runtime overheads and 

39 consumes extra energy, thus a good server consolidation strategy should 

40 carefully plan for migration in order to both minimize energy consumption 

41 and comply to the various SLAs. 

42 

43 This algorithm not only minimizes the overall number of used servers, 

44 but also minimizes the number of migrations. 

45 

46 It has been developed only for tests. You must have at least 2 physical 

47 compute nodes to run it, so you can easily run it on DevStack. It assumes 

48 that live migration is possible on your OpenStack cluster. 

49 """ 

50 

51 DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage'] 

52 

53 CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" 

54 

55 def __init__(self, config, osc=None): 

56 """Basic offline Consolidation using live migration 

57 

58 :param config: A mapping containing the configuration of this strategy 

59 :type config: :py:class:`~.Struct` instance 

60 :param osc: :py:class:`~.OpenStackClients` instance 

61 """ 

62 super(BasicConsolidation, self).__init__(config, osc) 

63 

64 # set default value for the number of enabled compute nodes 

65 self.number_of_enabled_nodes = 0 

66 # set default value for the number of released nodes 

67 self.number_of_released_nodes = 0 

68 # set default value for the number of migrations 

69 self.number_of_migrations = 0 

70 

71 # set default value for the efficacy 

72 self.efficacy = 100 

73 

74 # TODO(jed): improve threshold overbooking? 

75 self.threshold_mem = 1 

76 self.threshold_disk = 1 

77 self.threshold_cores = 1 

78 

79 @classmethod 

80 def get_name(cls): 

81 return "basic" 

82 

83 @property 

84 def migration_attempts(self): 

85 return self.input_parameters.get('migration_attempts', 0) 

86 

87 @property 

88 def period(self): 

89 return self.input_parameters.get('period', 7200) 

90 

91 @property 

92 def granularity(self): 

93 return self.input_parameters.get('granularity', 300) 

94 

95 @property 

96 def aggregation_method(self): 

97 return self.input_parameters.get( 

98 'aggregation_method', { 

99 "instance": 'mean', 

100 "compute_node": 'mean', 

101 "node": '' 

102 } 

103 ) 

104 

105 @classmethod 

106 def get_display_name(cls): 

107 return _("Basic offline consolidation") 

108 

109 @classmethod 

110 def get_translatable_display_name(cls): 

111 return "Basic offline consolidation" 

112 

113 @classmethod 

114 def get_schema(cls): 

115 # Mandatory default setting for each element 

116 return { 

117 "properties": { 

118 "migration_attempts": { 

119 "description": "Maximum number of combinations to be " 

120 "tried by the strategy while searching " 

121 "for potential candidates. To remove the " 

122 "limit, set it to 0 (by default)", 

123 "type": "number", 

124 "default": 0 

125 }, 

126 "period": { 

127 "description": "The time interval in seconds for " 

128 "getting statistic aggregation", 

129 "type": "number", 

130 "default": 7200 

131 }, 

132 "granularity": { 

133 "description": "The time between two measures in an " 

134 "aggregated timeseries of a metric.", 

135 "type": "number", 

136 "default": 300 

137 }, 

138 "aggregation_method": { 

139 "description": "Function used to aggregate multiple " 

140 "measures into an aggregate. For example, " 

141 "the min aggregation method will aggregate " 

142 "the values of different measures to the " 

143 "minimum value of all the measures in the " 

144 "time range.", 

145 "type": "object", 

146 "properties": { 

147 "instance": { 

148 "type": "string", 

149 "default": 'mean' 

150 }, 

151 "compute_node": { 

152 "type": "string", 

153 "default": 'mean' 

154 }, 

155 "node": { 

156 "type": "string", 

157 # node is deprecated 

158 "default": '' 

159 }, 

160 }, 

161 "default": { 

162 "instance": 'mean', 

163 "compute_node": 'mean', 

164 # node is deprecated 

165 "node": '', 

166 } 

167 }, 

168 }, 

169 } 

170 

171 @classmethod 

172 def get_config_opts(cls): 

173 return super(BasicConsolidation, cls).get_config_opts() + [ 

174 cfg.BoolOpt( 

175 'check_optimize_metadata', 

176 help='Check optimize metadata field in instance before' 

177 ' migration', 

178 default=False), 

179 ] 

180 

181 def get_available_compute_nodes(self): 

182 default_node_scope = [element.ServiceState.ENABLED.value, 

183 element.ServiceState.DISABLED.value] 

184 return {uuid: cn for uuid, cn in 

185 self.compute_model.get_all_compute_nodes().items() 

186 if cn.state == element.ServiceState.ONLINE.value and 

187 cn.status in default_node_scope} 

188 

189 def check_migration(self, source_node, destination_node, 

190 instance_to_migrate): 

191 """Check if the migration is possible 

192 

193 :param source_node: the current node of the virtual machine 

194 :param destination_node: the destination of the virtual machine 

195 :param instance_to_migrate: the instance / virtual machine 

196 :return: True if there is enough place otherwise false 

197 """ 

198 if source_node == destination_node: 

199 return False 

200 

201 LOG.debug('Migrate instance %s from %s to %s', 

202 instance_to_migrate, source_node, destination_node) 

203 

204 used_resources = self.compute_model.get_node_used_resources( 

205 destination_node) 

206 

207 # capacity requested by the compute node 

208 total_cores = used_resources['vcpu'] + instance_to_migrate.vcpus 

209 total_disk = used_resources['disk'] + instance_to_migrate.disk 

210 total_mem = used_resources['memory'] + instance_to_migrate.memory 

211 

212 return self.check_threshold(destination_node, total_cores, total_disk, 

213 total_mem) 

214 

215 def check_threshold(self, destination_node, total_cores, 

216 total_disk, total_mem): 

217 """Check threshold 

218 

219 Check the threshold value defined by the ratio of 

220 aggregated CPU capacity of VMs on one node to CPU capacity 

221 of this node must not exceed the threshold value. 

222 

223 :param destination_node: the destination of the virtual machine 

224 :param total_cores: total cores of the virtual machine 

225 :param total_disk: total disk size used by the virtual machine 

226 :param total_mem: total memory used by the virtual machine 

227 :return: True if the threshold is not exceed 

228 """ 

229 cpu_capacity = destination_node.vcpu_capacity 

230 disk_capacity = destination_node.disk_gb_capacity 

231 memory_capacity = destination_node.memory_mb_capacity 

232 

233 return (cpu_capacity >= total_cores * self.threshold_cores and 

234 disk_capacity >= total_disk * self.threshold_disk and 

235 memory_capacity >= total_mem * self.threshold_mem) 

236 

237 def calculate_weight(self, compute_resource, total_cores_used, 

238 total_disk_used, total_memory_used): 

239 """Calculate weight of every resource 

240 

241 :param compute_resource: 

242 :param total_cores_used: 

243 :param total_disk_used: 

244 :param total_memory_used: 

245 :return: 

246 """ 

247 cpu_capacity = compute_resource.vcpus 

248 disk_capacity = compute_resource.disk 

249 memory_capacity = compute_resource.memory 

250 

251 score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / 

252 float(cpu_capacity)) 

253 

254 # It's possible that disk_capacity is 0, e.g., m1.nano.disk = 0 

255 if disk_capacity == 0: 

256 score_disk = 0 

257 else: 

258 score_disk = (1 - (float(disk_capacity) - float(total_disk_used)) / 

259 float(disk_capacity)) 

260 

261 score_memory = ( 

262 1 - (float(memory_capacity) - float(total_memory_used)) / 

263 float(memory_capacity)) 

264 # TODO(jed): take in account weight 

265 return (score_cores + score_disk + score_memory) / 3 

266 

267 def get_compute_node_cpu_usage(self, compute_node): 

268 return self.datasource_backend.get_host_cpu_usage( 

269 compute_node, self.period, self.aggregation_method['compute_node'], 

270 self.granularity) 

271 

272 def get_instance_cpu_usage(self, instance): 

273 return self.datasource_backend.get_instance_cpu_usage( 

274 instance, self.period, self.aggregation_method['instance'], 

275 self.granularity) 

276 

277 def calculate_score_node(self, node): 

278 """Calculate the score that represent the utilization level 

279 

280 :param node: :py:class:`~.ComputeNode` instance 

281 :return: Score for the given compute node 

282 :rtype: float 

283 """ 

284 host_avg_cpu_util = self.get_compute_node_cpu_usage(node) 

285 

286 if host_avg_cpu_util is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 resource_id = "%s_%s" % (node.uuid, node.hostname) 

288 LOG.error( 

289 "No values returned by %(resource_id)s " 

290 "for %(metric_name)s", dict( 

291 resource_id=resource_id, 

292 metric_name='host_cpu_usage')) 

293 host_avg_cpu_util = 100 

294 

295 total_cores_used = node.vcpus * (host_avg_cpu_util / 100.0) 

296 

297 return self.calculate_weight(node, total_cores_used, 0, 0) 

298 

299 def calculate_score_instance(self, instance): 

300 """Calculate Score of virtual machine 

301 

302 :param instance: the virtual machine 

303 :return: score 

304 """ 

305 instance_cpu_utilization = self.get_instance_cpu_usage(instance) 

306 if instance_cpu_utilization is None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 LOG.error( 

308 "No values returned by %(resource_id)s " 

309 "for %(metric_name)s", dict( 

310 resource_id=instance.uuid, 

311 metric_name='instance_cpu_usage')) 

312 instance_cpu_utilization = 100 

313 

314 total_cores_used = instance.vcpus * (instance_cpu_utilization / 100.0) 

315 

316 return self.calculate_weight(instance, total_cores_used, 0, 0) 

317 

318 def add_action_disable_node(self, node): 

319 parameters = {'state': element.ServiceState.DISABLED.value, 

320 'disabled_reason': self.REASON_FOR_DISABLE, 

321 'resource_name': node.hostname} 

322 self.solution.add_action(action_type=self.CHANGE_NOVA_SERVICE_STATE, 

323 resource_id=node.uuid, 

324 input_parameters=parameters) 

325 

326 def compute_score_of_nodes(self): 

327 """Calculate score of nodes based on load by VMs""" 

328 score = [] 

329 for node in self.get_available_compute_nodes().values(): 

330 if node.status == element.ServiceState.ENABLED.value: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 self.number_of_enabled_nodes += 1 

332 

333 instances = self.compute_model.get_node_instances(node) 

334 if len(instances) > 0: 

335 result = self.calculate_score_node(node) 

336 score.append((node.uuid, result)) 

337 

338 return score 

339 

340 def node_and_instance_score(self, sorted_scores): 

341 """Get List of VMs from node""" 

342 node_to_release = sorted_scores[len(sorted_scores) - 1][0] 

343 instances = self.compute_model.get_node_instances( 

344 self.compute_model.get_node_by_uuid(node_to_release)) 

345 

346 instances_to_migrate = self.filter_instances_by_audit_tag(instances) 

347 instance_score = [] 

348 for instance in instances_to_migrate: 

349 if instance.state == element.InstanceState.ACTIVE.value: 349 ↛ 348line 349 didn't jump to line 348 because the condition on line 349 was always true

350 instance_score.append( 

351 (instance, self.calculate_score_instance(instance))) 

352 

353 return node_to_release, instance_score 

354 

355 def create_migration_instance(self, mig_instance, mig_source_node, 

356 mig_destination_node): 

357 """Create migration VM""" 

358 if self.compute_model.migrate_instance( 358 ↛ 364line 358 didn't jump to line 364 because the condition on line 358 was always true

359 mig_instance, mig_source_node, mig_destination_node): 

360 self.add_action_migrate(mig_instance, 'live', 

361 mig_source_node, 

362 mig_destination_node) 

363 

364 if len(self.compute_model.get_node_instances(mig_source_node)) == 0: 

365 self.add_action_disable_node(mig_source_node) 

366 self.number_of_released_nodes += 1 

367 

368 def calculate_num_migrations(self, sorted_instances, node_to_release, 

369 sorted_score): 

370 number_migrations = 0 

371 for mig_instance, __ in sorted_instances: 

372 # skip exclude instance when migrating 

373 if mig_instance.watcher_exclude: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 LOG.debug("Instance is excluded by scope, " 

375 "skipped: %s", mig_instance.uuid) 

376 continue 

377 for node_uuid, __ in sorted_score: 

378 mig_source_node = self.compute_model.get_node_by_uuid( 

379 node_to_release) 

380 mig_destination_node = self.compute_model.get_node_by_uuid( 

381 node_uuid) 

382 

383 result = self.check_migration( 

384 mig_source_node, mig_destination_node, mig_instance) 

385 if result: 

386 self.create_migration_instance( 

387 mig_instance, mig_source_node, mig_destination_node) 

388 number_migrations += 1 

389 break 

390 return number_migrations 

391 

392 def unsuccessful_migration_actualization(self, number_migrations, 

393 unsuccessful_migration): 

394 if number_migrations > 0: 

395 self.number_of_migrations += number_migrations 

396 return 0 

397 else: 

398 return unsuccessful_migration + 1 

399 

400 def pre_execute(self): 

401 self._pre_execute() 

402 

403 # backwards compatibility for node parameter. 

404 if self.aggregation_method['node'] != '': 

405 LOG.warning('Parameter node has been renamed to compute_node and ' 

406 'will be removed in next release.') 

407 self.aggregation_method['compute_node'] = \ 

408 self.aggregation_method['node'] 

409 

410 def do_execute(self, audit=None): 

411 unsuccessful_migration = 0 

412 

413 scores = self.compute_score_of_nodes() 

414 # Sort compute nodes by Score decreasing 

415 sorted_scores = sorted(scores, reverse=True, key=lambda x: (x[1])) 

416 LOG.debug("Compute node(s) BFD %s", sorted_scores) 

417 # Get Node to be released 

418 if len(scores) == 0: 

419 LOG.warning( 

420 "The workloads of the compute nodes" 

421 " of the cluster is zero") 

422 return 

423 

424 while sorted_scores and ( 

425 not self.migration_attempts or 

426 self.migration_attempts >= unsuccessful_migration): 

427 node_to_release, instance_score = self.node_and_instance_score( 

428 sorted_scores) 

429 

430 # Sort instances by Score 

431 sorted_instances = sorted( 

432 instance_score, reverse=True, key=lambda x: (x[1])) 

433 # BFD: Best Fit Decrease 

434 LOG.debug("Instance(s) BFD %s", sorted_instances) 

435 

436 migrations = self.calculate_num_migrations( 

437 sorted_instances, node_to_release, sorted_scores) 

438 

439 unsuccessful_migration = self.unsuccessful_migration_actualization( 

440 migrations, unsuccessful_migration) 

441 

442 if not migrations: 

443 # We don't have any possible migrations to perform on this node 

444 # so we discard the node so we can try to migrate instances 

445 # from the next one in the list 

446 sorted_scores.pop() 

447 

448 infos = { 

449 "compute_nodes_count": self.number_of_enabled_nodes, 

450 "released_compute_nodes_count": self.number_of_released_nodes, 

451 "instance_migrations_count": self.number_of_migrations, 

452 "efficacy": self.efficacy 

453 } 

454 LOG.debug(infos) 

455 

456 def post_execute(self): 

457 self.solution.set_efficacy_indicators( 

458 compute_nodes_count=self.number_of_enabled_nodes, 

459 released_compute_nodes_count=self.number_of_released_nodes, 

460 instance_migrations_count=self.number_of_migrations, 

461 ) 

462 LOG.debug(self.compute_model.to_string())