Coverage for watcher/decision_engine/strategy/strategies/workload_balance.py: 80%

154 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 Intel Corp 

3# 

4# Authors: Junjie-Huang <junjie.huang@intel.com> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19 

20from oslo_log import log 

21 

22from watcher._i18n import _ 

23from watcher.common import exception 

24from watcher.decision_engine.model import element 

25from watcher.decision_engine.strategy.strategies import base 

26 

27LOG = log.getLogger(__name__) 

28 

29 

30class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): 

31 """[PoC]Workload balance using live migration 

32 

33 *Description* 

34 

35 It is a migration strategy based on the VM workload of physical 

36 servers. It generates solutions to move a workload whenever a server's 

37 CPU or RAM utilization % is higher than the specified threshold. 

38 The VM to be moved should make the host close to average workload 

39 of all compute nodes. 

40 

41 *Requirements* 

42 

43 * Hardware: compute node should use the same physical CPUs/RAMs 

44 * Software: Ceilometer component ceilometer-agent-compute running 

45 in each compute node, and Ceilometer API can report such telemetry 

46 "instance_cpu_usage" and "instance_ram_usage" successfully. 

47 * You must have at least 2 physical compute nodes to run this strategy. 

48 

49 *Limitations* 

50 

51 - This is a proof of concept that is not meant to be used in production 

52 - We cannot forecast how many servers should be migrated. This is the 

53 reason why we only plan a single virtual machine migration at a time. 

54 So it's better to use this algorithm with `CONTINUOUS` audits. 

55 - It assume that live migrations are possible 

56 """ 

57 

58 # The meter to report CPU utilization % of VM in ceilometer 

59 # Unit: %, value range is [0 , 100] 

60 

61 # The meter to report memory resident of VM in ceilometer 

62 # Unit: MB 

63 

64 DATASOURCE_METRICS = ['instance_cpu_usage', 'instance_ram_usage'] 

65 

66 def __init__(self, config, osc=None): 

67 """Workload balance using live migration 

68 

69 :param config: A mapping containing the configuration of this strategy 

70 :type config: :py:class:`~.Struct` instance 

71 :param osc: :py:class:`~.OpenStackClients` instance 

72 """ 

73 super(WorkloadBalance, self).__init__(config, osc) 

74 # the migration plan will be triggered when the CPU or RAM 

75 # utilization % reaches threshold 

76 self._meter = None 

77 self.instance_migrations_count = 0 

78 

79 @classmethod 

80 def get_name(cls): 

81 return "workload_balance" 

82 

83 @classmethod 

84 def get_display_name(cls): 

85 return _("Workload Balance Migration Strategy") 

86 

87 @classmethod 

88 def get_translatable_display_name(cls): 

89 return "Workload Balance Migration Strategy" 

90 

91 @property 

92 def granularity(self): 

93 return self.input_parameters.get('granularity', 300) 

94 

95 @classmethod 

96 def get_schema(cls): 

97 # Mandatory default setting for each element 

98 return { 

99 "properties": { 

100 "metrics": { 

101 "description": "Workload balance based on metrics: " 

102 "cpu or ram utilization", 

103 "type": "string", 

104 "choice": ["instance_cpu_usage", "instance_ram_usage"], 

105 "default": "instance_cpu_usage" 

106 }, 

107 "threshold": { 

108 "description": "workload threshold for migration", 

109 "type": "number", 

110 "default": 25.0 

111 }, 

112 "period": { 

113 "description": "aggregate time period of ceilometer", 

114 "type": "number", 

115 "default": 300 

116 }, 

117 "granularity": { 

118 "description": "The time between two measures in an " 

119 "aggregated timeseries of a metric.", 

120 "type": "number", 

121 "default": 300 

122 }, 

123 }, 

124 } 

125 

126 def get_available_compute_nodes(self): 

127 default_node_scope = [element.ServiceState.ENABLED.value] 

128 return {uuid: cn for uuid, cn in 

129 self.compute_model.get_all_compute_nodes().items() 

130 if cn.state == element.ServiceState.ONLINE.value and 

131 cn.status in default_node_scope} 

132 

133 def choose_instance_to_migrate(self, hosts, avg_workload, workload_cache): 

134 """Pick up an active instance to migrate from provided hosts 

135 

136 :param hosts: the array of dict which contains node object 

137 :param avg_workload: the average workload value of all nodes 

138 :param workload_cache: the map contains instance to workload mapping 

139 """ 

140 for instance_data in hosts: 

141 source_node = instance_data['compute_node'] 

142 source_instances = self.compute_model.get_node_instances( 

143 source_node) 

144 if source_instances: 

145 delta_workload = instance_data['workload'] - avg_workload 

146 min_delta = 1000000 

147 instance_id = None 

148 for instance in source_instances: 

149 try: 

150 # NOTE: skip exclude instance when migrating 

151 if instance.watcher_exclude: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 LOG.debug("Instance is excluded by scope, " 

153 "skipped: %s", instance.uuid) 

154 continue 

155 # select the first active VM to migrate 

156 if (instance.state != 156 ↛ 158line 156 didn't jump to line 158 because the condition on line 156 was never true

157 element.InstanceState.ACTIVE.value): 

158 LOG.debug("Instance not active, skipped: %s", 

159 instance.uuid) 

160 continue 

161 current_delta = ( 

162 delta_workload - workload_cache[instance.uuid]) 

163 if 0 <= current_delta < min_delta: 

164 min_delta = current_delta 

165 instance_id = instance.uuid 

166 except exception.InstanceNotFound: 

167 LOG.error("Instance not found; error: %s", 

168 instance_id) 

169 if instance_id: 169 ↛ 140line 169 didn't jump to line 140 because the condition on line 169 was always true

170 return (source_node, 

171 self.compute_model.get_instance_by_uuid( 

172 instance_id)) 

173 else: 

174 LOG.info("VM not found from compute_node: %s", 

175 source_node.uuid) 

176 

177 def filter_destination_hosts(self, hosts, instance_to_migrate, 

178 avg_workload, workload_cache): 

179 """Only return hosts with sufficient available resources""" 

180 required_cores = instance_to_migrate.vcpus 

181 required_disk = instance_to_migrate.disk 

182 required_mem = instance_to_migrate.memory 

183 

184 # filter nodes without enough resource 

185 destination_hosts = [] 

186 src_instance_workload = workload_cache[instance_to_migrate.uuid] 

187 for instance_data in hosts: 

188 host = instance_data['compute_node'] 

189 workload = instance_data['workload'] 

190 # calculate the available resources 

191 free_res = self.compute_model.get_node_free_resources(host) 

192 if (free_res['vcpu'] >= required_cores and 192 ↛ 187line 192 didn't jump to line 187 because the condition on line 192 was always true

193 free_res['memory'] >= required_mem and 

194 free_res['disk'] >= required_disk): 

195 if self._meter == 'instance_cpu_usage': 195 ↛ 201line 195 didn't jump to line 201 because the condition on line 195 was always true

196 usage = src_instance_workload + workload 

197 usage_percent = usage / host.vcpus * 100 

198 limit = self.threshold / 100 * host.vcpus 

199 if usage < limit: 199 ↛ 201line 199 didn't jump to line 201 because the condition on line 199 was always true

200 destination_hosts.append(instance_data) 

201 if self._meter == 'instance_ram_usage': 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 usage = src_instance_workload + workload 

203 usage_percent = usage / host.memory * 100 

204 limit = self.threshold / 100 * host.memory 

205 if usage < limit: 

206 destination_hosts.append(instance_data) 

207 LOG.debug(f"Host {host.hostname} evaluated as destination " 

208 f"for {instance_to_migrate.uuid}. Host usage " 

209 f"for {self._meter} metric would be {usage_percent}." 

210 f"The threshold is: {self.threshold}. " 

211 f"selected: {usage < limit}" 

212 ) 

213 return destination_hosts 

214 

215 def group_hosts_by_cpu_or_ram_util(self): 

216 """Calculate the workloads of each compute_node 

217 

218 try to find out the nodes which have reached threshold 

219 and the nodes which are under threshold. 

220 and also calculate the average workload value of all nodes. 

221 and also generate the instance workload map. 

222 """ 

223 

224 nodes = self.get_available_compute_nodes() 

225 cluster_size = len(nodes) 

226 overload_hosts = [] 

227 nonoverload_hosts = [] 

228 # total workload of cluster 

229 cluster_workload = 0.0 

230 # use workload_cache to store the workload of VMs for reuse purpose 

231 workload_cache = {} 

232 for node_id in nodes: 

233 node = self.compute_model.get_node_by_uuid(node_id) 

234 instances = self.compute_model.get_node_instances(node) 

235 node_workload = 0.0 

236 for instance in instances: 

237 util = None 

238 try: 

239 util = self.datasource_backend.statistic_aggregation( 

240 instance, 'instance', self._meter, self._period, 

241 'mean', self._granularity) 

242 except Exception as exc: 

243 LOG.exception(exc) 

244 LOG.error("Can not get %s from %s", self._meter, 

245 self.datasource_backend.NAME) 

246 continue 

247 if util is None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 LOG.debug("Instance (%s): %s is None", 

249 instance.uuid, self._meter) 

250 continue 

251 if self._meter == 'instance_cpu_usage': 

252 workload_cache[instance.uuid] = (util * 

253 instance.vcpus / 100) 

254 else: 

255 workload_cache[instance.uuid] = util 

256 node_workload += workload_cache[instance.uuid] 

257 LOG.debug("VM (%s): %s %f", instance.uuid, self._meter, 

258 util) 

259 

260 cluster_workload += node_workload 

261 if self._meter == 'instance_cpu_usage': 

262 node_util = node_workload / node.vcpus * 100 

263 host_metric = 'host_cpu_usage_percent' 

264 else: 

265 node_util = node_workload / node.memory * 100 

266 host_metric = 'host_ram_usage_percent' 

267 

268 instance_data = { 

269 'compute_node': node, self._meter: node_util, 

270 'workload': node_workload} 

271 if node_util >= self.threshold: 

272 # mark the node to release resources 

273 overload_hosts.append(instance_data) 

274 else: 

275 nonoverload_hosts.append(instance_data) 

276 LOG.debug(f"Host usage for {node_id}: {host_metric} is " 

277 f"{node_util}. Higher than threshold {self.threshold}: " 

278 f"{node_util >= self.threshold}") 

279 

280 avg_workload = 0 

281 if cluster_size != 0: 281 ↛ 284line 281 didn't jump to line 284 because the condition on line 281 was always true

282 avg_workload = cluster_workload / cluster_size 

283 

284 return overload_hosts, nonoverload_hosts, avg_workload, workload_cache 

285 

286 def pre_execute(self): 

287 self._pre_execute() 

288 self.threshold = self.input_parameters.threshold 

289 self._period = self.input_parameters.period 

290 self._meter = self.input_parameters.metrics 

291 self._granularity = self.input_parameters.granularity 

292 

293 def do_execute(self, audit=None): 

294 """Strategy execution phase 

295 

296 This phase is where you should put the main logic of your strategy. 

297 """ 

298 source_nodes, target_nodes, avg_workload, workload_cache = ( 

299 self.group_hosts_by_cpu_or_ram_util()) 

300 

301 if not source_nodes: 

302 LOG.debug("No hosts require optimization") 

303 return self.solution 

304 

305 if not target_nodes: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 LOG.warning("No hosts current have CPU utilization under %s " 

307 "percent, therefore there are no possible target " 

308 "hosts for any migration", 

309 self.threshold) 

310 return self.solution 

311 

312 # choose the server with largest cpu usage 

313 source_nodes = sorted(source_nodes, 

314 reverse=True, 

315 key=lambda x: (x[self._meter])) 

316 

317 instance_to_migrate = self.choose_instance_to_migrate( 

318 source_nodes, avg_workload, workload_cache) 

319 if not instance_to_migrate: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 return self.solution 

321 source_node, instance_src = instance_to_migrate 

322 # find the hosts that have enough resource for the VM to be migrated 

323 destination_hosts = self.filter_destination_hosts( 

324 target_nodes, instance_src, avg_workload, workload_cache) 

325 # sort the filtered result by workload 

326 # pick up the lowest one as dest server 

327 if not destination_hosts: 327 ↛ 329line 327 didn't jump to line 329 because the condition on line 327 was never true

328 # for instance. 

329 LOG.warning("No proper target host could be found, it might " 

330 "be because of there's no enough CPU/Memory/DISK") 

331 return self.solution 

332 destination_hosts = sorted(destination_hosts, 

333 key=lambda x: (x[self._meter])) 

334 # always use the host with lowerest CPU utilization 

335 mig_destination_node = destination_hosts[0]['compute_node'] 

336 # generate solution to migrate the instance to the dest server, 

337 if self.compute_model.migrate_instance( 337 ↛ exitline 337 didn't return from function 'do_execute' because the condition on line 337 was always true

338 instance_src, source_node, mig_destination_node): 

339 self.add_action_migrate( 

340 instance_src, 

341 'live', 

342 source_node, 

343 mig_destination_node) 

344 self.instance_migrations_count += 1 

345 

346 def post_execute(self): 

347 """Post-execution phase 

348 

349 This can be used to compute the global efficacy 

350 """ 

351 self.solution.model = self.compute_model 

352 self.solution.set_efficacy_indicators( 

353 instance_migrations_count=self.instance_migrations_count, 

354 instances_count=len(self.compute_model.get_all_instances()) 

355 ) 

356 

357 LOG.debug(self.compute_model.to_string())