Coverage for watcher/applier/workflow_engine/default.py: 82%
92 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 b<>com
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
18from oslo_concurrency import processutils
19from oslo_config import cfg
20from oslo_log import log
21from taskflow import engines
22from taskflow import exceptions as tf_exception
23from taskflow.patterns import graph_flow as gf
24from taskflow import task as flow_task
26from watcher.applier.workflow_engine import base
27from watcher.common import exception
28from watcher import conf
29from watcher import objects
31CONF = conf.CONF
33LOG = log.getLogger(__name__)
36class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
37 """Taskflow as a workflow engine for Watcher
39 Full documentation on taskflow at
40 https://docs.openstack.org/taskflow/latest
41 """
43 def decider(self, history):
44 # decider – A callback function that will be expected to
45 # decide at runtime whether v should be allowed to execute
46 # (or whether the execution of v should be ignored,
47 # and therefore not executed). It is expected to take as single
48 # keyword argument history which will be the execution results of
49 # all u decidable links that have v as a target. It is expected
50 # to return a single boolean
51 # (True to allow v execution or False to not).
52 LOG.info("decider history: %s", history)
53 if history and self.execution_rule == 'ANY':
54 return not list(history.values())[0]
55 else:
56 return True
58 @classmethod
59 def get_config_opts(cls):
60 return [
61 cfg.IntOpt(
62 'max_workers',
63 default=processutils.get_worker_count(),
64 min=1,
65 required=True,
66 help='Number of workers for taskflow engine '
67 'to execute actions.'),
68 cfg.DictOpt(
69 'action_execution_rule',
70 default={},
71 help='The execution rule for linked actions,'
72 'the key is strategy name and '
73 'value ALWAYS means all actions will be executed,'
74 'value ANY means if previous action executes '
75 'success, the next action will be ignored.'
76 'None means ALWAYS.')
77 ]
79 def get_execution_rule(self, actions):
80 if actions:
81 actionplan_object = objects.ActionPlan.get_by_id(
82 self.context, actions[0].action_plan_id)
83 strategy_object = objects.Strategy.get_by_id(
84 self.context, actionplan_object.strategy_id)
85 return self.config.action_execution_rule.get(
86 strategy_object.name)
88 def execute(self, actions):
89 try:
90 # NOTE(jed) We want to have a strong separation of concern
91 # between the Watcher planner and the Watcher Applier in order
92 # to us the possibility to support several workflow engine.
93 # We want to provide the 'taskflow' engine by
94 # default although we still want to leave the possibility for
95 # the users to change it.
96 # The current implementation uses graph with linked actions.
97 # todo(jed) add oslo conf for retry and name
98 self.execution_rule = self.get_execution_rule(actions)
99 flow = gf.Flow("watcher_flow")
100 actions_uuid = {}
101 for a in actions:
102 task = TaskFlowActionContainer(a, self)
103 flow.add(task)
104 actions_uuid[a.uuid] = task
106 for a in actions:
107 for parent_id in a.parents:
108 flow.link(actions_uuid[parent_id], actions_uuid[a.uuid],
109 decider=self.decider)
111 e = engines.load(
112 flow, executor='greenthreaded', engine='parallel',
113 max_workers=self.config.max_workers)
114 e.run()
116 return flow
118 except exception.ActionPlanCancelled:
119 raise
121 except tf_exception.WrappedFailure as e:
122 if e.check("watcher.common.exception.ActionPlanCancelled"):
123 raise exception.ActionPlanCancelled
124 else:
125 raise exception.WorkflowExecutionException(error=e)
127 except Exception as e:
128 raise exception.WorkflowExecutionException(error=e)
131class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
132 def __init__(self, db_action, engine):
133 self.name = "action_type:{0} uuid:{1}".format(db_action.action_type,
134 db_action.uuid)
135 super(TaskFlowActionContainer, self).__init__(self.name,
136 db_action,
137 engine)
139 def do_pre_execute(self):
140 db_action = self.engine.notify(self._db_action,
141 objects.action.State.ONGOING)
142 LOG.debug("Pre-condition action: %s", self.name)
143 self.action.pre_condition()
144 return db_action
146 def do_execute(self, *args, **kwargs):
147 LOG.debug("Running action: %s", self.name)
149 # NOTE:Some actions(such as migrate) will return None when exception
150 # Only when True is returned, the action state is set to SUCCEEDED
151 result = self.action.execute()
152 if result is True:
153 return self.engine.notify(self._db_action,
154 objects.action.State.SUCCEEDED)
155 else:
156 self.engine.notify(self._db_action,
157 objects.action.State.FAILED)
158 raise exception.ActionExecutionFailure(
159 action_id=self._db_action.uuid)
161 def do_post_execute(self):
162 LOG.debug("Post-condition action: %s", self.name)
163 self.action.post_condition()
165 def do_revert(self, *args, **kwargs):
166 # NOTE: Not rollback action plan
167 if not CONF.watcher_applier.rollback_when_actionplan_failed:
168 LOG.info("Failed actionplan rollback option is turned off, and "
169 "the following action will be skipped: %s", self.name)
170 return
172 LOG.warning("Revert action: %s", self.name)
173 try:
174 # TODO(jed): do we need to update the states in case of failure?
175 self.action.revert()
176 except Exception as e:
177 LOG.exception(e)
178 LOG.critical("Oops! We need a disaster recover plan.")
180 def do_abort(self, *args, **kwargs):
181 LOG.warning("Aborting action: %s", self.name)
182 try:
183 result = self.action.abort()
184 if result: 184 ↛ 189line 184 didn't jump to line 189 because the condition on line 184 was always true
185 # Aborted the action.
186 return self.engine.notify(self._db_action,
187 objects.action.State.CANCELLED)
188 else:
189 return self.engine.notify(self._db_action,
190 objects.action.State.SUCCEEDED)
191 except Exception as e:
192 LOG.exception(e)
193 return self.engine.notify(self._db_action,
194 objects.action.State.FAILED)
197class TaskFlowNop(flow_task.Task):
198 """This class is used in case of the workflow have only one Action.
200 We need at least two atoms to create a link.
201 """
203 def execute(self):
204 pass