Coverage for watcher/applier/workflow_engine/default.py: 82%

92 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 b<>com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17 

18from oslo_concurrency import processutils 

19from oslo_config import cfg 

20from oslo_log import log 

21from taskflow import engines 

22from taskflow import exceptions as tf_exception 

23from taskflow.patterns import graph_flow as gf 

24from taskflow import task as flow_task 

25 

26from watcher.applier.workflow_engine import base 

27from watcher.common import exception 

28from watcher import conf 

29from watcher import objects 

30 

31CONF = conf.CONF 

32 

33LOG = log.getLogger(__name__) 

34 

35 

36class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): 

37 """Taskflow as a workflow engine for Watcher 

38 

39 Full documentation on taskflow at 

40 https://docs.openstack.org/taskflow/latest 

41 """ 

42 

43 def decider(self, history): 

44 # decider – A callback function that will be expected to 

45 # decide at runtime whether v should be allowed to execute 

46 # (or whether the execution of v should be ignored, 

47 # and therefore not executed). It is expected to take as single 

48 # keyword argument history which will be the execution results of 

49 # all u decidable links that have v as a target. It is expected 

50 # to return a single boolean 

51 # (True to allow v execution or False to not). 

52 LOG.info("decider history: %s", history) 

53 if history and self.execution_rule == 'ANY': 

54 return not list(history.values())[0] 

55 else: 

56 return True 

57 

58 @classmethod 

59 def get_config_opts(cls): 

60 return [ 

61 cfg.IntOpt( 

62 'max_workers', 

63 default=processutils.get_worker_count(), 

64 min=1, 

65 required=True, 

66 help='Number of workers for taskflow engine ' 

67 'to execute actions.'), 

68 cfg.DictOpt( 

69 'action_execution_rule', 

70 default={}, 

71 help='The execution rule for linked actions,' 

72 'the key is strategy name and ' 

73 'value ALWAYS means all actions will be executed,' 

74 'value ANY means if previous action executes ' 

75 'success, the next action will be ignored.' 

76 'None means ALWAYS.') 

77 ] 

78 

79 def get_execution_rule(self, actions): 

80 if actions: 

81 actionplan_object = objects.ActionPlan.get_by_id( 

82 self.context, actions[0].action_plan_id) 

83 strategy_object = objects.Strategy.get_by_id( 

84 self.context, actionplan_object.strategy_id) 

85 return self.config.action_execution_rule.get( 

86 strategy_object.name) 

87 

88 def execute(self, actions): 

89 try: 

90 # NOTE(jed) We want to have a strong separation of concern 

91 # between the Watcher planner and the Watcher Applier in order 

92 # to us the possibility to support several workflow engine. 

93 # We want to provide the 'taskflow' engine by 

94 # default although we still want to leave the possibility for 

95 # the users to change it. 

96 # The current implementation uses graph with linked actions. 

97 # todo(jed) add oslo conf for retry and name 

98 self.execution_rule = self.get_execution_rule(actions) 

99 flow = gf.Flow("watcher_flow") 

100 actions_uuid = {} 

101 for a in actions: 

102 task = TaskFlowActionContainer(a, self) 

103 flow.add(task) 

104 actions_uuid[a.uuid] = task 

105 

106 for a in actions: 

107 for parent_id in a.parents: 

108 flow.link(actions_uuid[parent_id], actions_uuid[a.uuid], 

109 decider=self.decider) 

110 

111 e = engines.load( 

112 flow, executor='greenthreaded', engine='parallel', 

113 max_workers=self.config.max_workers) 

114 e.run() 

115 

116 return flow 

117 

118 except exception.ActionPlanCancelled: 

119 raise 

120 

121 except tf_exception.WrappedFailure as e: 

122 if e.check("watcher.common.exception.ActionPlanCancelled"): 

123 raise exception.ActionPlanCancelled 

124 else: 

125 raise exception.WorkflowExecutionException(error=e) 

126 

127 except Exception as e: 

128 raise exception.WorkflowExecutionException(error=e) 

129 

130 

131class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): 

132 def __init__(self, db_action, engine): 

133 self.name = "action_type:{0} uuid:{1}".format(db_action.action_type, 

134 db_action.uuid) 

135 super(TaskFlowActionContainer, self).__init__(self.name, 

136 db_action, 

137 engine) 

138 

139 def do_pre_execute(self): 

140 db_action = self.engine.notify(self._db_action, 

141 objects.action.State.ONGOING) 

142 LOG.debug("Pre-condition action: %s", self.name) 

143 self.action.pre_condition() 

144 return db_action 

145 

146 def do_execute(self, *args, **kwargs): 

147 LOG.debug("Running action: %s", self.name) 

148 

149 # NOTE:Some actions(such as migrate) will return None when exception 

150 # Only when True is returned, the action state is set to SUCCEEDED 

151 result = self.action.execute() 

152 if result is True: 

153 return self.engine.notify(self._db_action, 

154 objects.action.State.SUCCEEDED) 

155 else: 

156 self.engine.notify(self._db_action, 

157 objects.action.State.FAILED) 

158 raise exception.ActionExecutionFailure( 

159 action_id=self._db_action.uuid) 

160 

161 def do_post_execute(self): 

162 LOG.debug("Post-condition action: %s", self.name) 

163 self.action.post_condition() 

164 

165 def do_revert(self, *args, **kwargs): 

166 # NOTE: Not rollback action plan 

167 if not CONF.watcher_applier.rollback_when_actionplan_failed: 

168 LOG.info("Failed actionplan rollback option is turned off, and " 

169 "the following action will be skipped: %s", self.name) 

170 return 

171 

172 LOG.warning("Revert action: %s", self.name) 

173 try: 

174 # TODO(jed): do we need to update the states in case of failure? 

175 self.action.revert() 

176 except Exception as e: 

177 LOG.exception(e) 

178 LOG.critical("Oops! We need a disaster recover plan.") 

179 

180 def do_abort(self, *args, **kwargs): 

181 LOG.warning("Aborting action: %s", self.name) 

182 try: 

183 result = self.action.abort() 

184 if result: 184 ↛ 189line 184 didn't jump to line 189 because the condition on line 184 was always true

185 # Aborted the action. 

186 return self.engine.notify(self._db_action, 

187 objects.action.State.CANCELLED) 

188 else: 

189 return self.engine.notify(self._db_action, 

190 objects.action.State.SUCCEEDED) 

191 except Exception as e: 

192 LOG.exception(e) 

193 return self.engine.notify(self._db_action, 

194 objects.action.State.FAILED) 

195 

196 

197class TaskFlowNop(flow_task.Task): 

198 """This class is used in case of the workflow have only one Action. 

199 

200 We need at least two atoms to create a link. 

201 """ 

202 

203 def execute(self): 

204 pass