Coverage for watcher/decision_engine/strategy/strategies/uniform_airflow.py: 85%
136 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 Intel Corp
3#
4# Authors: Junjie-Huang <junjie.huang@intel.com>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
20from oslo_log import log
22from watcher._i18n import _
23from watcher.decision_engine.model import element
24from watcher.decision_engine.strategy.strategies import base
26LOG = log.getLogger(__name__)
29class UniformAirflow(base.BaseStrategy):
30 """[PoC]Uniform Airflow using live migration
32 *Description*
34 It is a migration strategy based on the airflow of physical
35 servers. It generates solutions to move VM whenever a server's
36 airflow is higher than the specified threshold.
38 *Requirements*
40 * Hardware: compute node with NodeManager 3.0 support
41 * Software: Ceilometer component ceilometer-agent-compute running
42 in each compute node, and Ceilometer API can report such telemetry
43 "airflow, system power, inlet temperature" successfully.
44 * You must have at least 2 physical compute nodes to run this strategy
46 *Limitations*
48 - This is a proof of concept that is not meant to be used in production.
49 - We cannot forecast how many servers should be migrated. This is the
50 reason why we only plan a single virtual machine migration at a time.
51 So it's better to use this algorithm with `CONTINUOUS` audits.
52 - It assumes that live migrations are possible.
53 """
55 # choose 300 seconds as the default duration of meter aggregation
56 PERIOD = 300
58 DATASOURCE_METRICS = ['host_airflow', 'host_inlet_temp', 'host_power']
60 def __init__(self, config, osc=None):
61 """Using live migration
63 :param config: A mapping containing the configuration of this strategy
64 :type config: dict
65 :param osc: an OpenStackClients object
66 """
67 super(UniformAirflow, self).__init__(config, osc)
68 # The migration plan will be triggered when the airflow reaches
69 # threshold
70 self._period = self.PERIOD
72 @classmethod
73 def get_name(cls):
74 return "uniform_airflow"
76 @classmethod
77 def get_display_name(cls):
78 return _("Uniform airflow migration strategy")
80 @classmethod
81 def get_translatable_display_name(cls):
82 return "Uniform airflow migration strategy"
84 @classmethod
85 def get_goal_name(cls):
86 return "airflow_optimization"
88 @property
89 def granularity(self):
90 return self.input_parameters.get('granularity', 300)
92 @classmethod
93 def get_schema(cls):
94 # Mandatory default setting for each element
95 return {
96 "properties": {
97 "threshold_airflow": {
98 "description": ("airflow threshold for migration, Unit is "
99 "0.1CFM"),
100 "type": "number",
101 "default": 400.0
102 },
103 "threshold_inlet_t": {
104 "description": ("inlet temperature threshold for "
105 "migration decision"),
106 "type": "number",
107 "default": 28.0
108 },
109 "threshold_power": {
110 "description": ("system power threshold for migration "
111 "decision"),
112 "type": "number",
113 "default": 350.0
114 },
115 "period": {
116 "description": "aggregate time period of ceilometer",
117 "type": "number",
118 "default": 300
119 },
120 "granularity": {
121 "description": "The time between two measures in an "
122 "aggregated timeseries of a metric.",
123 "type": "number",
124 "default": 300
125 },
126 },
127 }
129 def get_available_compute_nodes(self):
130 default_node_scope = [element.ServiceState.ENABLED.value]
131 return {uuid: cn for uuid, cn in
132 self.compute_model.get_all_compute_nodes().items()
133 if cn.state == element.ServiceState.ONLINE.value and
134 cn.status in default_node_scope}
136 def calculate_used_resource(self, node):
137 """Compute the used vcpus, memory and disk based on instance flavors"""
138 used_res = self.compute_model.get_node_used_resources(node)
140 return used_res['vcpu'], used_res['memory'], used_res['disk']
142 def choose_instance_to_migrate(self, hosts):
143 """Pick up an active instance to migrate from provided hosts
145 :param hosts: the array of dict which contains node object
146 """
147 instances_tobe_migrate = []
148 for nodemap in hosts:
149 source_node = nodemap['node']
150 source_instances = self.compute_model.get_node_instances(
151 source_node)
152 if source_instances:
153 inlet_temp = self.datasource_backend.statistic_aggregation(
154 resource=source_node,
155 resource_type='instance',
156 meter_name='host_inlet_temp',
157 period=self._period,
158 granularity=self.granularity)
159 power = self.datasource_backend.statistic_aggregation(
160 resource=source_node,
161 resource_type='instance',
162 meter_name='host_power',
163 period=self._period,
164 granularity=self.granularity)
165 if (power < self.threshold_power and
166 inlet_temp < self.threshold_inlet_t):
167 # hardware issue, migrate all instances from this node
168 for instance in source_instances:
169 instances_tobe_migrate.append(instance)
170 return source_node, instances_tobe_migrate
171 else:
172 # migrate the first active instance
173 for instance in source_instances: 173 ↛ 148line 173 didn't jump to line 148 because the loop on line 173 didn't complete
174 # NOTE: skip exclude instance when migrating
175 if instance.watcher_exclude: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 LOG.debug("Instance is excluded by scope, "
177 "skipped: %s", instance.uuid)
178 continue
179 if (instance.state != 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was never true
180 element.InstanceState.ACTIVE.value):
181 LOG.info(
182 "Instance not active, skipped: %s",
183 instance.uuid)
184 continue
185 instances_tobe_migrate.append(instance)
186 return source_node, instances_tobe_migrate
187 else:
188 LOG.info("Instance not found on node: %s",
189 source_node.uuid)
191 def filter_destination_hosts(self, hosts, instances_to_migrate):
192 """Find instance and host with sufficient available resources"""
193 # large instances go first
194 instances_to_migrate = sorted(
195 instances_to_migrate, reverse=True,
196 key=lambda x: (x.vcpus))
197 # find hosts for instances
198 destination_hosts = []
199 for instance_to_migrate in instances_to_migrate:
200 required_cores = instance_to_migrate.vcpus
201 required_disk = instance_to_migrate.disk
202 required_mem = instance_to_migrate.memory
203 dest_migrate_info = {}
204 for nodemap in hosts: 204 ↛ 199line 204 didn't jump to line 199 because the loop on line 204 didn't complete
205 host = nodemap['node']
206 if 'cores_used' not in nodemap:
207 # calculate the available resources
208 nodemap['cores_used'], nodemap['mem_used'], \
209 nodemap['disk_used'] = self.calculate_used_resource(
210 host)
211 cores_available = (host.vcpus -
212 nodemap['cores_used'])
213 disk_available = (host.disk -
214 nodemap['disk_used'])
215 mem_available = (
216 host.memory - nodemap['mem_used'])
217 if (cores_available >= required_cores and 217 ↛ 204line 217 didn't jump to line 204 because the condition on line 217 was always true
218 disk_available >= required_disk and
219 mem_available >= required_mem):
220 dest_migrate_info['instance'] = instance_to_migrate
221 dest_migrate_info['node'] = host
222 nodemap['cores_used'] += required_cores
223 nodemap['mem_used'] += required_mem
224 nodemap['disk_used'] += required_disk
225 destination_hosts.append(dest_migrate_info)
226 break
227 # check if all instances have target hosts
228 if len(destination_hosts) != len(instances_to_migrate): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 LOG.warning("Not all target hosts could be found; it might "
230 "be because there is not enough resource")
231 return None
232 return destination_hosts
234 def group_hosts_by_airflow(self):
235 """Group hosts based on airflow meters"""
237 nodes = self.get_available_compute_nodes()
238 overload_hosts = []
239 nonoverload_hosts = []
240 for node_id in nodes:
241 airflow = None
242 node = self.compute_model.get_node_by_uuid(
243 node_id)
244 airflow = self.datasource_backend.statistic_aggregation(
245 resource=node,
246 resource_type='compute_node',
247 meter_name='host_airflow',
248 period=self._period,
249 granularity=self.granularity)
250 # some hosts may not have airflow meter, remove from target
251 if airflow is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 LOG.warning("%s: no airflow data", node.uuid)
253 continue
255 LOG.debug("%(resource)s: airflow %(airflow)f",
256 {'resource': node, 'airflow': airflow})
257 nodemap = {'node': node, 'airflow': airflow}
258 if airflow >= self.threshold_airflow:
259 # mark the node to release resources
260 overload_hosts.append(nodemap)
261 else:
262 nonoverload_hosts.append(nodemap)
263 return overload_hosts, nonoverload_hosts
265 def pre_execute(self):
266 self._pre_execute()
267 self.meter_name_airflow = 'host_airflow'
268 self.meter_name_inlet_t = 'host_inlet_temp'
269 self.meter_name_power = 'host_power'
271 self.threshold_airflow = self.input_parameters.threshold_airflow
272 self.threshold_inlet_t = self.input_parameters.threshold_inlet_t
273 self.threshold_power = self.input_parameters.threshold_power
274 self._period = self.input_parameters.period
276 def do_execute(self, audit=None):
277 source_nodes, target_nodes = self.group_hosts_by_airflow()
279 if not source_nodes: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 LOG.debug("No hosts require optimization")
281 return self.solution
283 if not target_nodes:
284 LOG.warning("No hosts currently have airflow under %s, "
285 "therefore there are no possible target "
286 "hosts for any migration",
287 self.threshold_airflow)
288 return self.solution
290 # migrate the instance from server with largest airflow first
291 source_nodes = sorted(source_nodes,
292 reverse=True,
293 key=lambda x: (x["airflow"]))
294 instances_to_migrate = self.choose_instance_to_migrate(source_nodes)
295 if not instances_to_migrate: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 return self.solution
297 source_node, instances_src = instances_to_migrate
298 # sort host with airflow
299 target_nodes = sorted(target_nodes, key=lambda x: (x["airflow"]))
300 # find the hosts that have enough resource
301 # for the instance to be migrated
302 destination_hosts = self.filter_destination_hosts(
303 target_nodes, instances_src)
304 if not destination_hosts: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 LOG.warning("No target host could be found; it might "
306 "be because there is not enough resources")
307 return self.solution
308 # generate solution to migrate the instance to the dest server,
309 for info in destination_hosts:
310 instance = info['instance']
311 destination_node = info['node']
312 if self.compute_model.migrate_instance( 312 ↛ 309line 312 didn't jump to line 309 because the condition on line 312 was always true
313 instance, source_node, destination_node):
314 self.add_action_migrate(
315 instance,
316 'live',
317 source_node,
318 destination_node)
320 def post_execute(self):
321 self.solution.model = self.compute_model
322 # TODO(v-francoise): Add the indicators to the solution
324 LOG.debug(self.compute_model.to_string())