Coverage for watcher/applier/workflow_engine/base.py: 85%
145 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 b<>com
3#
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
19import abc
20import time
22import eventlet
24from oslo_log import log
25from taskflow import task as flow_task
27from watcher.applier.actions import factory
28from watcher.common import clients
29from watcher.common import exception
30from watcher.common.loader import loadable
31from watcher import notifications
32from watcher import objects
33from watcher.objects import fields
36LOG = log.getLogger(__name__)
38CANCEL_STATE = [objects.action_plan.State.CANCELLING,
39 objects.action_plan.State.CANCELLED]
42class BaseWorkFlowEngine(loadable.Loadable, metaclass=abc.ABCMeta):
44 def __init__(self, config, context=None, applier_manager=None):
45 """Constructor
47 :param config: A mapping containing the configuration of this
48 workflow engine
49 :type config: dict
50 :param osc: an OpenStackClients object, defaults to None
51 :type osc: :py:class:`~.OpenStackClients` instance, optional
52 """
53 super(BaseWorkFlowEngine, self).__init__(config)
54 self._context = context
55 self._applier_manager = applier_manager
56 self._action_factory = factory.ActionFactory()
57 self._osc = None
58 self._is_notified = False
59 self.execution_rule = None
61 @classmethod
62 def get_config_opts(cls):
63 """Defines the configuration options to be associated to this loadable
65 :return: A list of configuration options relative to this Loadable
66 :rtype: list of :class:`oslo_config.cfg.Opt` instances
67 """
68 return []
70 @property
71 def context(self):
72 return self._context
74 @property
75 def osc(self):
76 if not self._osc:
77 self._osc = clients.OpenStackClients()
78 return self._osc
80 @property
81 def applier_manager(self):
82 return self._applier_manager
84 @property
85 def action_factory(self):
86 return self._action_factory
88 def notify(self, action, state):
89 db_action = objects.Action.get_by_uuid(self.context, action.uuid,
90 eager=True)
91 db_action.state = state
92 db_action.save()
93 return db_action
95 def notify_cancel_start(self, action_plan_uuid):
96 action_plan = objects.ActionPlan.get_by_uuid(self.context,
97 action_plan_uuid,
98 eager=True)
99 if not self._is_notified:
100 self._is_notified = True
101 notifications.action_plan.send_cancel_notification(
102 self._context, action_plan,
103 action=fields.NotificationAction.CANCEL,
104 phase=fields.NotificationPhase.START)
106 @abc.abstractmethod
107 def execute(self, actions):
108 raise NotImplementedError()
111class BaseTaskFlowActionContainer(flow_task.Task):
113 def __init__(self, name, db_action, engine, **kwargs):
114 super(BaseTaskFlowActionContainer, self).__init__(name=name)
115 self._db_action = db_action
116 self._engine = engine
117 self.loaded_action = None
119 @property
120 def engine(self):
121 return self._engine
123 @property
124 def action(self):
125 if self.loaded_action is None:
126 action = self.engine.action_factory.make_action(
127 self._db_action,
128 osc=self._engine.osc)
129 self.loaded_action = action
130 return self.loaded_action
132 @abc.abstractmethod
133 def do_pre_execute(self):
134 raise NotImplementedError()
136 @abc.abstractmethod
137 def do_execute(self, *args, **kwargs):
138 raise NotImplementedError()
140 @abc.abstractmethod
141 def do_post_execute(self):
142 raise NotImplementedError()
144 @abc.abstractmethod
145 def do_revert(self):
146 raise NotImplementedError()
148 @abc.abstractmethod
149 def do_abort(self, *args, **kwargs):
150 raise NotImplementedError()
152 # NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute,
153 # post_execute) independently. We want to support notifications in base
154 # class, so child's methods should be named with `do_` prefix and wrapped.
155 def pre_execute(self):
156 try:
157 # NOTE(adisky): check the state of action plan before starting
158 # next action, if action plan is cancelled raise the exceptions
159 # so that taskflow does not schedule further actions.
160 action_plan = objects.ActionPlan.get_by_id(
161 self.engine.context, self._db_action.action_plan_id)
162 if action_plan.state in CANCEL_STATE:
163 raise exception.ActionPlanCancelled(uuid=action_plan.uuid)
164 db_action = self.do_pre_execute()
165 notifications.action.send_execution_notification(
166 self.engine.context, db_action,
167 fields.NotificationAction.EXECUTION,
168 fields.NotificationPhase.START)
169 except exception.ActionPlanCancelled as e:
170 LOG.exception(e)
171 self.engine.notify_cancel_start(action_plan.uuid)
172 raise
173 except Exception as e:
174 LOG.exception(e)
175 db_action = self.engine.notify(self._db_action,
176 objects.action.State.FAILED)
177 notifications.action.send_execution_notification(
178 self.engine.context, db_action,
179 fields.NotificationAction.EXECUTION,
180 fields.NotificationPhase.ERROR,
181 priority=fields.NotificationPriority.ERROR)
183 def execute(self, *args, **kwargs):
184 def _do_execute_action(*args, **kwargs):
185 try:
186 db_action = self.do_execute(*args, **kwargs)
187 notifications.action.send_execution_notification(
188 self.engine.context, db_action,
189 fields.NotificationAction.EXECUTION,
190 fields.NotificationPhase.END)
191 except Exception as e:
192 LOG.exception(e)
193 LOG.error('The workflow engine has failed '
194 'to execute the action: %s', self.name)
195 db_action = self.engine.notify(self._db_action,
196 objects.action.State.FAILED)
197 notifications.action.send_execution_notification(
198 self.engine.context, db_action,
199 fields.NotificationAction.EXECUTION,
200 fields.NotificationPhase.ERROR,
201 priority=fields.NotificationPriority.ERROR)
202 raise
203 # NOTE: spawn a new thread for action execution, so that if action plan
204 # is cancelled workflow engine will not wait to finish action execution
205 et = eventlet.spawn(_do_execute_action, *args, **kwargs)
206 # NOTE: check for the state of action plan periodically,so that if
207 # action is finished or action plan is cancelled we can exit from here.
208 result = False
209 while True:
210 action_object = objects.Action.get_by_uuid(
211 self.engine.context, self._db_action.uuid, eager=True)
212 action_plan_object = objects.ActionPlan.get_by_id(
213 self.engine.context, action_object.action_plan_id)
214 if action_object.state == objects.action.State.SUCCEEDED:
215 result = True
216 if (action_object.state in [objects.action.State.SUCCEEDED,
217 objects.action.State.FAILED] or
218 action_plan_object.state in CANCEL_STATE):
219 break
220 time.sleep(1)
221 try:
222 # NOTE: kill the action execution thread, if action plan is
223 # cancelled for all other cases wait for the result from action
224 # execution thread.
225 # Not all actions support abort operations, kill only those action
226 # which support abort operations
227 abort = self.action.check_abort()
228 if (action_plan_object.state in CANCEL_STATE and abort):
229 et.kill()
230 et.wait()
231 return result
233 # NOTE: catch the greenlet exit exception due to thread kill,
234 # taskflow will call revert for the action,
235 # we will redirect it to abort.
236 except eventlet.greenlet.GreenletExit:
237 self.engine.notify_cancel_start(action_plan_object.uuid)
238 raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid)
240 except Exception as e:
241 LOG.exception(e)
242 # return False instead of raising an exception
243 return False
245 def post_execute(self):
246 try:
247 self.do_post_execute()
248 except Exception as e:
249 LOG.exception(e)
250 db_action = self.engine.notify(self._db_action,
251 objects.action.State.FAILED)
252 notifications.action.send_execution_notification(
253 self.engine.context, db_action,
254 fields.NotificationAction.EXECUTION,
255 fields.NotificationPhase.ERROR,
256 priority=fields.NotificationPriority.ERROR)
258 def revert(self, *args, **kwargs):
259 action_plan = objects.ActionPlan.get_by_id(
260 self.engine.context, self._db_action.action_plan_id, eager=True)
261 # NOTE: check if revert cause by cancel action plan or
262 # some other exception occurred during action plan execution
263 # if due to some other exception keep the flow intact.
264 if action_plan.state not in CANCEL_STATE:
265 self.do_revert()
266 return
268 action_object = objects.Action.get_by_uuid(
269 self.engine.context, self._db_action.uuid, eager=True)
270 try:
271 if action_object.state == objects.action.State.ONGOING: 271 ↛ 285line 271 didn't jump to line 285 because the condition on line 271 was always true
272 action_object.state = objects.action.State.CANCELLING
273 action_object.save()
274 notifications.action.send_cancel_notification(
275 self.engine.context, action_object,
276 fields.NotificationAction.CANCEL,
277 fields.NotificationPhase.START)
278 action_object = self.abort()
280 notifications.action.send_cancel_notification(
281 self.engine.context, action_object,
282 fields.NotificationAction.CANCEL,
283 fields.NotificationPhase.END)
285 if action_object.state == objects.action.State.PENDING: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 notifications.action.send_cancel_notification(
287 self.engine.context, action_object,
288 fields.NotificationAction.CANCEL,
289 fields.NotificationPhase.START)
290 action_object.state = objects.action.State.CANCELLED
291 action_object.save()
292 notifications.action.send_cancel_notification(
293 self.engine.context, action_object,
294 fields.NotificationAction.CANCEL,
295 fields.NotificationPhase.END)
297 except Exception as e:
298 LOG.exception(e)
299 action_object.state = objects.action.State.FAILED
300 action_object.save()
301 notifications.action.send_cancel_notification(
302 self.engine.context, action_object,
303 fields.NotificationAction.CANCEL,
304 fields.NotificationPhase.ERROR,
305 priority=fields.NotificationPriority.ERROR)
307 def abort(self, *args, **kwargs):
308 return self.do_abort(*args, **kwargs)