Coverage for watcher/decision_engine/strategy/strategies/uniform_airflow.py: 85%

136 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 Intel Corp 

3# 

4# Authors: Junjie-Huang <junjie.huang@intel.com> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19 

20from oslo_log import log 

21 

22from watcher._i18n import _ 

23from watcher.decision_engine.model import element 

24from watcher.decision_engine.strategy.strategies import base 

25 

26LOG = log.getLogger(__name__) 

27 

28 

29class UniformAirflow(base.BaseStrategy): 

30 """[PoC]Uniform Airflow using live migration 

31 

32 *Description* 

33 

34 It is a migration strategy based on the airflow of physical 

35 servers. It generates solutions to move VM whenever a server's 

36 airflow is higher than the specified threshold. 

37 

38 *Requirements* 

39 

40 * Hardware: compute node with NodeManager 3.0 support 

41 * Software: Ceilometer component ceilometer-agent-compute running 

42 in each compute node, and Ceilometer API can report such telemetry 

43 "airflow, system power, inlet temperature" successfully. 

44 * You must have at least 2 physical compute nodes to run this strategy 

45 

46 *Limitations* 

47 

48 - This is a proof of concept that is not meant to be used in production. 

49 - We cannot forecast how many servers should be migrated. This is the 

50 reason why we only plan a single virtual machine migration at a time. 

51 So it's better to use this algorithm with `CONTINUOUS` audits. 

52 - It assumes that live migrations are possible. 

53 """ 

54 

55 # choose 300 seconds as the default duration of meter aggregation 

56 PERIOD = 300 

57 

58 DATASOURCE_METRICS = ['host_airflow', 'host_inlet_temp', 'host_power'] 

59 

60 def __init__(self, config, osc=None): 

61 """Using live migration 

62 

63 :param config: A mapping containing the configuration of this strategy 

64 :type config: dict 

65 :param osc: an OpenStackClients object 

66 """ 

67 super(UniformAirflow, self).__init__(config, osc) 

68 # The migration plan will be triggered when the airflow reaches 

69 # threshold 

70 self._period = self.PERIOD 

71 

72 @classmethod 

73 def get_name(cls): 

74 return "uniform_airflow" 

75 

76 @classmethod 

77 def get_display_name(cls): 

78 return _("Uniform airflow migration strategy") 

79 

80 @classmethod 

81 def get_translatable_display_name(cls): 

82 return "Uniform airflow migration strategy" 

83 

84 @classmethod 

85 def get_goal_name(cls): 

86 return "airflow_optimization" 

87 

88 @property 

89 def granularity(self): 

90 return self.input_parameters.get('granularity', 300) 

91 

92 @classmethod 

93 def get_schema(cls): 

94 # Mandatory default setting for each element 

95 return { 

96 "properties": { 

97 "threshold_airflow": { 

98 "description": ("airflow threshold for migration, Unit is " 

99 "0.1CFM"), 

100 "type": "number", 

101 "default": 400.0 

102 }, 

103 "threshold_inlet_t": { 

104 "description": ("inlet temperature threshold for " 

105 "migration decision"), 

106 "type": "number", 

107 "default": 28.0 

108 }, 

109 "threshold_power": { 

110 "description": ("system power threshold for migration " 

111 "decision"), 

112 "type": "number", 

113 "default": 350.0 

114 }, 

115 "period": { 

116 "description": "aggregate time period of ceilometer", 

117 "type": "number", 

118 "default": 300 

119 }, 

120 "granularity": { 

121 "description": "The time between two measures in an " 

122 "aggregated timeseries of a metric.", 

123 "type": "number", 

124 "default": 300 

125 }, 

126 }, 

127 } 

128 

129 def get_available_compute_nodes(self): 

130 default_node_scope = [element.ServiceState.ENABLED.value] 

131 return {uuid: cn for uuid, cn in 

132 self.compute_model.get_all_compute_nodes().items() 

133 if cn.state == element.ServiceState.ONLINE.value and 

134 cn.status in default_node_scope} 

135 

136 def calculate_used_resource(self, node): 

137 """Compute the used vcpus, memory and disk based on instance flavors""" 

138 used_res = self.compute_model.get_node_used_resources(node) 

139 

140 return used_res['vcpu'], used_res['memory'], used_res['disk'] 

141 

142 def choose_instance_to_migrate(self, hosts): 

143 """Pick up an active instance to migrate from provided hosts 

144 

145 :param hosts: the array of dict which contains node object 

146 """ 

147 instances_tobe_migrate = [] 

148 for nodemap in hosts: 

149 source_node = nodemap['node'] 

150 source_instances = self.compute_model.get_node_instances( 

151 source_node) 

152 if source_instances: 

153 inlet_temp = self.datasource_backend.statistic_aggregation( 

154 resource=source_node, 

155 resource_type='instance', 

156 meter_name='host_inlet_temp', 

157 period=self._period, 

158 granularity=self.granularity) 

159 power = self.datasource_backend.statistic_aggregation( 

160 resource=source_node, 

161 resource_type='instance', 

162 meter_name='host_power', 

163 period=self._period, 

164 granularity=self.granularity) 

165 if (power < self.threshold_power and 

166 inlet_temp < self.threshold_inlet_t): 

167 # hardware issue, migrate all instances from this node 

168 for instance in source_instances: 

169 instances_tobe_migrate.append(instance) 

170 return source_node, instances_tobe_migrate 

171 else: 

172 # migrate the first active instance 

173 for instance in source_instances: 173 ↛ 148line 173 didn't jump to line 148 because the loop on line 173 didn't complete

174 # NOTE: skip exclude instance when migrating 

175 if instance.watcher_exclude: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 LOG.debug("Instance is excluded by scope, " 

177 "skipped: %s", instance.uuid) 

178 continue 

179 if (instance.state != 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was never true

180 element.InstanceState.ACTIVE.value): 

181 LOG.info( 

182 "Instance not active, skipped: %s", 

183 instance.uuid) 

184 continue 

185 instances_tobe_migrate.append(instance) 

186 return source_node, instances_tobe_migrate 

187 else: 

188 LOG.info("Instance not found on node: %s", 

189 source_node.uuid) 

190 

191 def filter_destination_hosts(self, hosts, instances_to_migrate): 

192 """Find instance and host with sufficient available resources""" 

193 # large instances go first 

194 instances_to_migrate = sorted( 

195 instances_to_migrate, reverse=True, 

196 key=lambda x: (x.vcpus)) 

197 # find hosts for instances 

198 destination_hosts = [] 

199 for instance_to_migrate in instances_to_migrate: 

200 required_cores = instance_to_migrate.vcpus 

201 required_disk = instance_to_migrate.disk 

202 required_mem = instance_to_migrate.memory 

203 dest_migrate_info = {} 

204 for nodemap in hosts: 204 ↛ 199line 204 didn't jump to line 199 because the loop on line 204 didn't complete

205 host = nodemap['node'] 

206 if 'cores_used' not in nodemap: 

207 # calculate the available resources 

208 nodemap['cores_used'], nodemap['mem_used'], \ 

209 nodemap['disk_used'] = self.calculate_used_resource( 

210 host) 

211 cores_available = (host.vcpus - 

212 nodemap['cores_used']) 

213 disk_available = (host.disk - 

214 nodemap['disk_used']) 

215 mem_available = ( 

216 host.memory - nodemap['mem_used']) 

217 if (cores_available >= required_cores and 217 ↛ 204line 217 didn't jump to line 204 because the condition on line 217 was always true

218 disk_available >= required_disk and 

219 mem_available >= required_mem): 

220 dest_migrate_info['instance'] = instance_to_migrate 

221 dest_migrate_info['node'] = host 

222 nodemap['cores_used'] += required_cores 

223 nodemap['mem_used'] += required_mem 

224 nodemap['disk_used'] += required_disk 

225 destination_hosts.append(dest_migrate_info) 

226 break 

227 # check if all instances have target hosts 

228 if len(destination_hosts) != len(instances_to_migrate): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 LOG.warning("Not all target hosts could be found; it might " 

230 "be because there is not enough resource") 

231 return None 

232 return destination_hosts 

233 

234 def group_hosts_by_airflow(self): 

235 """Group hosts based on airflow meters""" 

236 

237 nodes = self.get_available_compute_nodes() 

238 overload_hosts = [] 

239 nonoverload_hosts = [] 

240 for node_id in nodes: 

241 airflow = None 

242 node = self.compute_model.get_node_by_uuid( 

243 node_id) 

244 airflow = self.datasource_backend.statistic_aggregation( 

245 resource=node, 

246 resource_type='compute_node', 

247 meter_name='host_airflow', 

248 period=self._period, 

249 granularity=self.granularity) 

250 # some hosts may not have airflow meter, remove from target 

251 if airflow is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 LOG.warning("%s: no airflow data", node.uuid) 

253 continue 

254 

255 LOG.debug("%(resource)s: airflow %(airflow)f", 

256 {'resource': node, 'airflow': airflow}) 

257 nodemap = {'node': node, 'airflow': airflow} 

258 if airflow >= self.threshold_airflow: 

259 # mark the node to release resources 

260 overload_hosts.append(nodemap) 

261 else: 

262 nonoverload_hosts.append(nodemap) 

263 return overload_hosts, nonoverload_hosts 

264 

265 def pre_execute(self): 

266 self._pre_execute() 

267 self.meter_name_airflow = 'host_airflow' 

268 self.meter_name_inlet_t = 'host_inlet_temp' 

269 self.meter_name_power = 'host_power' 

270 

271 self.threshold_airflow = self.input_parameters.threshold_airflow 

272 self.threshold_inlet_t = self.input_parameters.threshold_inlet_t 

273 self.threshold_power = self.input_parameters.threshold_power 

274 self._period = self.input_parameters.period 

275 

276 def do_execute(self, audit=None): 

277 source_nodes, target_nodes = self.group_hosts_by_airflow() 

278 

279 if not source_nodes: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 LOG.debug("No hosts require optimization") 

281 return self.solution 

282 

283 if not target_nodes: 

284 LOG.warning("No hosts currently have airflow under %s, " 

285 "therefore there are no possible target " 

286 "hosts for any migration", 

287 self.threshold_airflow) 

288 return self.solution 

289 

290 # migrate the instance from server with largest airflow first 

291 source_nodes = sorted(source_nodes, 

292 reverse=True, 

293 key=lambda x: (x["airflow"])) 

294 instances_to_migrate = self.choose_instance_to_migrate(source_nodes) 

295 if not instances_to_migrate: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 return self.solution 

297 source_node, instances_src = instances_to_migrate 

298 # sort host with airflow 

299 target_nodes = sorted(target_nodes, key=lambda x: (x["airflow"])) 

300 # find the hosts that have enough resource 

301 # for the instance to be migrated 

302 destination_hosts = self.filter_destination_hosts( 

303 target_nodes, instances_src) 

304 if not destination_hosts: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 LOG.warning("No target host could be found; it might " 

306 "be because there is not enough resources") 

307 return self.solution 

308 # generate solution to migrate the instance to the dest server, 

309 for info in destination_hosts: 

310 instance = info['instance'] 

311 destination_node = info['node'] 

312 if self.compute_model.migrate_instance( 312 ↛ 309line 312 didn't jump to line 309 because the condition on line 312 was always true

313 instance, source_node, destination_node): 

314 self.add_action_migrate( 

315 instance, 

316 'live', 

317 source_node, 

318 destination_node) 

319 

320 def post_execute(self): 

321 self.solution.model = self.compute_model 

322 # TODO(v-francoise): Add the indicators to the solution 

323 

324 LOG.debug(self.compute_model.to_string())