Coverage for watcher/decision_engine/planner/workload_stabilization.py: 90%
141 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
17import abc
19from oslo_config import cfg
20from oslo_log import log
22from watcher.common import clients
23from watcher.common import exception
24from watcher.common import nova_helper
25from watcher.common import utils
26from watcher.decision_engine.planner import base
27from watcher import objects
29LOG = log.getLogger(__name__)
32class WorkloadStabilizationPlanner(base.BasePlanner):
33 """Workload Stabilization planner implementation
35 This implementation comes with basic rules with a set of action types that
36 are weighted. An action having a lower weight will be scheduled before the
37 other ones. The set of action types can be specified by 'weights' in the
38 ``watcher.conf``. You need to associate a different weight to all available
39 actions into the configuration file, otherwise you will get an error when
40 the new action will be referenced in the solution produced by a strategy.
42 *Limitations*
44 - This is a proof of concept that is not meant to be used in production
45 """
47 def __init__(self, config):
48 super(WorkloadStabilizationPlanner, self).__init__(config)
49 self._osc = clients.OpenStackClients()
51 @property
52 def osc(self):
53 return self._osc
55 weights_dict = {
56 'turn_host_to_acpi_s3_state': 0,
57 'resize': 1,
58 'migrate': 2,
59 'sleep': 3,
60 'change_nova_service_state': 4,
61 'nop': 5,
62 }
64 @classmethod
65 def get_config_opts(cls):
66 return [
67 cfg.DictOpt(
68 'weights',
69 help="These weights are used to schedule the actions",
70 default=cls.weights_dict),
71 ]
73 def create_action(self,
74 action_plan_id,
75 action_type,
76 input_parameters=None):
77 uuid = utils.generate_uuid()
78 action = {
79 'uuid': uuid,
80 'action_plan_id': int(action_plan_id),
81 'action_type': action_type,
82 'input_parameters': input_parameters,
83 'state': objects.action.State.PENDING,
84 'parents': None
85 }
87 return action
89 def load_child_class(self, child_name):
90 for c in BaseActionValidator.__subclasses__():
91 if child_name == c.action_name:
92 return c()
93 return None
95 def schedule(self, context, audit_id, solution):
96 LOG.debug('Creating an action plan for the audit uuid: %s', audit_id)
97 weights = self.config.weights
98 action_plan = self._create_action_plan(context, audit_id, solution)
100 actions = list(solution.actions)
101 to_schedule = []
102 for action in actions:
103 json_action = self.create_action(
104 action_plan_id=action_plan.id,
105 action_type=action.get('action_type'),
106 input_parameters=action.get('input_parameters'))
107 to_schedule.append((weights[action.get('action_type')],
108 json_action))
110 self._create_efficacy_indicators(
111 context, action_plan.id, solution.efficacy_indicators)
113 # scheduling
114 scheduled = sorted(to_schedule, key=lambda weight: (weight[0]),
115 reverse=True)
116 if len(scheduled) == 0:
117 LOG.warning("The action plan is empty")
118 action_plan.state = objects.action_plan.State.SUCCEEDED
119 action_plan.save()
120 else:
121 resource_action_map = {}
122 scheduled_actions = [x[1] for x in scheduled]
123 for action in scheduled_actions:
124 a_type = action['action_type']
125 if a_type != 'turn_host_to_acpi_s3_state':
126 plugin_action = self.load_child_class(
127 action.get("action_type"))
128 if not plugin_action:
129 raise exception.UnsupportedActionType(
130 action_type=action.get("action_type"))
131 db_action = self._create_action(context, action)
132 parents = plugin_action.validate_parents(
133 resource_action_map, action)
134 if parents:
135 db_action.parents = parents
136 db_action.save()
137 # if we have an action that will make host unreachable, we need
138 # to complete all actions (resize and migration type)
139 # related to the host.
140 # Note(alexchadin): turn_host_to_acpi_s3_state doesn't
141 # actually exist. Placed code shows relations between
142 # action types.
143 # TODO(alexchadin): add turn_host_to_acpi_s3_state action type.
144 else:
145 host_to_acpi_s3 = action['input_parameters']['resource_id']
146 host_actions = resource_action_map.get(host_to_acpi_s3)
147 action_parents = []
148 if host_actions: 148 ↛ 166line 148 didn't jump to line 166 because the condition on line 148 was always true
149 resize_actions = [x[0] for x in host_actions
150 if x[1] == 'resize']
151 migrate_actions = [x[0] for x in host_actions
152 if x[1] == 'migrate']
153 resize_migration_parents = [
154 x.parents for x in
155 [objects.Action.get_by_uuid(context, resize_action)
156 for resize_action in resize_actions]]
157 # resize_migration_parents should be one level list
158 resize_migration_parents = [
159 parent for sublist in resize_migration_parents
160 for parent in sublist]
161 action_parents.extend([uuid for uuid in
162 resize_actions])
163 action_parents.extend([uuid for uuid in
164 migrate_actions if uuid not in
165 resize_migration_parents])
166 db_action = self._create_action(context, action)
167 db_action.parents = action_parents
168 db_action.save()
170 return action_plan
172 def _create_action_plan(self, context, audit_id, solution):
173 strategy = objects.Strategy.get_by_name(
174 context, solution.strategy.name)
176 action_plan_dict = {
177 'uuid': utils.generate_uuid(),
178 'audit_id': audit_id,
179 'strategy_id': strategy.id,
180 'state': objects.action_plan.State.RECOMMENDED,
181 'global_efficacy': solution.global_efficacy,
182 }
184 new_action_plan = objects.ActionPlan(context, **action_plan_dict)
185 new_action_plan.create()
187 return new_action_plan
189 def _create_efficacy_indicators(self, context, action_plan_id, indicators):
190 efficacy_indicators = []
191 for indicator in indicators: 191 ↛ 192line 191 didn't jump to line 192 because the loop on line 191 never started
192 efficacy_indicator_dict = {
193 'uuid': utils.generate_uuid(),
194 'name': indicator.name,
195 'description': indicator.description,
196 'unit': indicator.unit,
197 'value': indicator.value,
198 'action_plan_id': action_plan_id,
199 }
200 new_efficacy_indicator = objects.EfficacyIndicator(
201 context, **efficacy_indicator_dict)
202 new_efficacy_indicator.create()
204 efficacy_indicators.append(new_efficacy_indicator)
205 return efficacy_indicators
207 def _create_action(self, context, _action):
208 try:
209 LOG.debug("Creating the %s in the Watcher database",
210 _action.get("action_type"))
212 new_action = objects.Action(context, **_action)
213 new_action.create()
215 return new_action
216 except Exception as exc:
217 LOG.exception(exc)
218 raise
221class BaseActionValidator(object):
222 action_name = None
224 def __init__(self):
225 super(BaseActionValidator, self).__init__()
226 self._osc = None
228 @property
229 def osc(self):
230 if not self._osc: 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true
231 self._osc = clients.OpenStackClients()
232 return self._osc
234 @abc.abstractmethod
235 def validate_parents(self, resource_action_map, action):
236 raise NotImplementedError()
238 def _mapping(self, resource_action_map, resource_id, action_uuid,
239 action_type):
240 if resource_id not in resource_action_map:
241 resource_action_map[resource_id] = [(action_uuid,
242 action_type,)]
243 else:
244 resource_action_map[resource_id].append((action_uuid,
245 action_type,))
248class MigrationActionValidator(BaseActionValidator):
249 action_name = "migrate"
251 def validate_parents(self, resource_action_map, action):
252 instance_uuid = action['input_parameters']['resource_id']
253 host_name = action['input_parameters']['source_node']
254 self._mapping(resource_action_map, instance_uuid, action['uuid'],
255 'migrate')
256 self._mapping(resource_action_map, host_name, action['uuid'],
257 'migrate')
260class ResizeActionValidator(BaseActionValidator):
261 action_name = "resize"
263 def validate_parents(self, resource_action_map, action):
264 nova = nova_helper.NovaHelper(osc=self.osc)
265 instance_uuid = action['input_parameters']['resource_id']
266 parent_actions = resource_action_map.get(instance_uuid)
267 host_of_instance = nova.get_hostname(
268 nova.get_instance_by_uuid(instance_uuid)[0])
269 self._mapping(resource_action_map, host_of_instance, action['uuid'],
270 'resize')
271 if parent_actions: 271 ↛ 274line 271 didn't jump to line 274 because the condition on line 271 was always true
272 return [x[0] for x in parent_actions]
273 else:
274 return []
277class ChangeNovaServiceStateActionValidator(BaseActionValidator):
278 action_name = "change_nova_service_state"
280 def validate_parents(self, resource_action_map, action):
281 host_name = action['input_parameters']['resource_id']
282 self._mapping(resource_action_map, host_name, action['uuid'],
283 'change_nova_service_state')
284 return []
287class SleepActionValidator(BaseActionValidator):
288 action_name = "sleep"
290 def validate_parents(self, resource_action_map, action):
291 return []
294class NOPActionValidator(BaseActionValidator):
295 action_name = "nop"
297 def validate_parents(self, resource_action_map, action):
298 return []