Coverage for watcher/applier/workflow_engine/base.py: 85%

145 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 b<>com 

3# 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

14# implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19import abc 

20import time 

21 

22import eventlet 

23 

24from oslo_log import log 

25from taskflow import task as flow_task 

26 

27from watcher.applier.actions import factory 

28from watcher.common import clients 

29from watcher.common import exception 

30from watcher.common.loader import loadable 

31from watcher import notifications 

32from watcher import objects 

33from watcher.objects import fields 

34 

35 

36LOG = log.getLogger(__name__) 

37 

38CANCEL_STATE = [objects.action_plan.State.CANCELLING, 

39 objects.action_plan.State.CANCELLED] 

40 

41 

42class BaseWorkFlowEngine(loadable.Loadable, metaclass=abc.ABCMeta): 

43 

44 def __init__(self, config, context=None, applier_manager=None): 

45 """Constructor 

46 

47 :param config: A mapping containing the configuration of this 

48 workflow engine 

49 :type config: dict 

50 :param osc: an OpenStackClients object, defaults to None 

51 :type osc: :py:class:`~.OpenStackClients` instance, optional 

52 """ 

53 super(BaseWorkFlowEngine, self).__init__(config) 

54 self._context = context 

55 self._applier_manager = applier_manager 

56 self._action_factory = factory.ActionFactory() 

57 self._osc = None 

58 self._is_notified = False 

59 self.execution_rule = None 

60 

61 @classmethod 

62 def get_config_opts(cls): 

63 """Defines the configuration options to be associated to this loadable 

64 

65 :return: A list of configuration options relative to this Loadable 

66 :rtype: list of :class:`oslo_config.cfg.Opt` instances 

67 """ 

68 return [] 

69 

70 @property 

71 def context(self): 

72 return self._context 

73 

74 @property 

75 def osc(self): 

76 if not self._osc: 

77 self._osc = clients.OpenStackClients() 

78 return self._osc 

79 

80 @property 

81 def applier_manager(self): 

82 return self._applier_manager 

83 

84 @property 

85 def action_factory(self): 

86 return self._action_factory 

87 

88 def notify(self, action, state): 

89 db_action = objects.Action.get_by_uuid(self.context, action.uuid, 

90 eager=True) 

91 db_action.state = state 

92 db_action.save() 

93 return db_action 

94 

95 def notify_cancel_start(self, action_plan_uuid): 

96 action_plan = objects.ActionPlan.get_by_uuid(self.context, 

97 action_plan_uuid, 

98 eager=True) 

99 if not self._is_notified: 

100 self._is_notified = True 

101 notifications.action_plan.send_cancel_notification( 

102 self._context, action_plan, 

103 action=fields.NotificationAction.CANCEL, 

104 phase=fields.NotificationPhase.START) 

105 

106 @abc.abstractmethod 

107 def execute(self, actions): 

108 raise NotImplementedError() 

109 

110 

111class BaseTaskFlowActionContainer(flow_task.Task): 

112 

113 def __init__(self, name, db_action, engine, **kwargs): 

114 super(BaseTaskFlowActionContainer, self).__init__(name=name) 

115 self._db_action = db_action 

116 self._engine = engine 

117 self.loaded_action = None 

118 

119 @property 

120 def engine(self): 

121 return self._engine 

122 

123 @property 

124 def action(self): 

125 if self.loaded_action is None: 

126 action = self.engine.action_factory.make_action( 

127 self._db_action, 

128 osc=self._engine.osc) 

129 self.loaded_action = action 

130 return self.loaded_action 

131 

132 @abc.abstractmethod 

133 def do_pre_execute(self): 

134 raise NotImplementedError() 

135 

136 @abc.abstractmethod 

137 def do_execute(self, *args, **kwargs): 

138 raise NotImplementedError() 

139 

140 @abc.abstractmethod 

141 def do_post_execute(self): 

142 raise NotImplementedError() 

143 

144 @abc.abstractmethod 

145 def do_revert(self): 

146 raise NotImplementedError() 

147 

148 @abc.abstractmethod 

149 def do_abort(self, *args, **kwargs): 

150 raise NotImplementedError() 

151 

152 # NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute, 

153 # post_execute) independently. We want to support notifications in base 

154 # class, so child's methods should be named with `do_` prefix and wrapped. 

155 def pre_execute(self): 

156 try: 

157 # NOTE(adisky): check the state of action plan before starting 

158 # next action, if action plan is cancelled raise the exceptions 

159 # so that taskflow does not schedule further actions. 

160 action_plan = objects.ActionPlan.get_by_id( 

161 self.engine.context, self._db_action.action_plan_id) 

162 if action_plan.state in CANCEL_STATE: 

163 raise exception.ActionPlanCancelled(uuid=action_plan.uuid) 

164 db_action = self.do_pre_execute() 

165 notifications.action.send_execution_notification( 

166 self.engine.context, db_action, 

167 fields.NotificationAction.EXECUTION, 

168 fields.NotificationPhase.START) 

169 except exception.ActionPlanCancelled as e: 

170 LOG.exception(e) 

171 self.engine.notify_cancel_start(action_plan.uuid) 

172 raise 

173 except Exception as e: 

174 LOG.exception(e) 

175 db_action = self.engine.notify(self._db_action, 

176 objects.action.State.FAILED) 

177 notifications.action.send_execution_notification( 

178 self.engine.context, db_action, 

179 fields.NotificationAction.EXECUTION, 

180 fields.NotificationPhase.ERROR, 

181 priority=fields.NotificationPriority.ERROR) 

182 

183 def execute(self, *args, **kwargs): 

184 def _do_execute_action(*args, **kwargs): 

185 try: 

186 db_action = self.do_execute(*args, **kwargs) 

187 notifications.action.send_execution_notification( 

188 self.engine.context, db_action, 

189 fields.NotificationAction.EXECUTION, 

190 fields.NotificationPhase.END) 

191 except Exception as e: 

192 LOG.exception(e) 

193 LOG.error('The workflow engine has failed ' 

194 'to execute the action: %s', self.name) 

195 db_action = self.engine.notify(self._db_action, 

196 objects.action.State.FAILED) 

197 notifications.action.send_execution_notification( 

198 self.engine.context, db_action, 

199 fields.NotificationAction.EXECUTION, 

200 fields.NotificationPhase.ERROR, 

201 priority=fields.NotificationPriority.ERROR) 

202 raise 

203 # NOTE: spawn a new thread for action execution, so that if action plan 

204 # is cancelled workflow engine will not wait to finish action execution 

205 et = eventlet.spawn(_do_execute_action, *args, **kwargs) 

206 # NOTE: check for the state of action plan periodically,so that if 

207 # action is finished or action plan is cancelled we can exit from here. 

208 result = False 

209 while True: 

210 action_object = objects.Action.get_by_uuid( 

211 self.engine.context, self._db_action.uuid, eager=True) 

212 action_plan_object = objects.ActionPlan.get_by_id( 

213 self.engine.context, action_object.action_plan_id) 

214 if action_object.state == objects.action.State.SUCCEEDED: 

215 result = True 

216 if (action_object.state in [objects.action.State.SUCCEEDED, 

217 objects.action.State.FAILED] or 

218 action_plan_object.state in CANCEL_STATE): 

219 break 

220 time.sleep(1) 

221 try: 

222 # NOTE: kill the action execution thread, if action plan is 

223 # cancelled for all other cases wait for the result from action 

224 # execution thread. 

225 # Not all actions support abort operations, kill only those action 

226 # which support abort operations 

227 abort = self.action.check_abort() 

228 if (action_plan_object.state in CANCEL_STATE and abort): 

229 et.kill() 

230 et.wait() 

231 return result 

232 

233 # NOTE: catch the greenlet exit exception due to thread kill, 

234 # taskflow will call revert for the action, 

235 # we will redirect it to abort. 

236 except eventlet.greenlet.GreenletExit: 

237 self.engine.notify_cancel_start(action_plan_object.uuid) 

238 raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid) 

239 

240 except Exception as e: 

241 LOG.exception(e) 

242 # return False instead of raising an exception 

243 return False 

244 

245 def post_execute(self): 

246 try: 

247 self.do_post_execute() 

248 except Exception as e: 

249 LOG.exception(e) 

250 db_action = self.engine.notify(self._db_action, 

251 objects.action.State.FAILED) 

252 notifications.action.send_execution_notification( 

253 self.engine.context, db_action, 

254 fields.NotificationAction.EXECUTION, 

255 fields.NotificationPhase.ERROR, 

256 priority=fields.NotificationPriority.ERROR) 

257 

258 def revert(self, *args, **kwargs): 

259 action_plan = objects.ActionPlan.get_by_id( 

260 self.engine.context, self._db_action.action_plan_id, eager=True) 

261 # NOTE: check if revert cause by cancel action plan or 

262 # some other exception occurred during action plan execution 

263 # if due to some other exception keep the flow intact. 

264 if action_plan.state not in CANCEL_STATE: 

265 self.do_revert() 

266 return 

267 

268 action_object = objects.Action.get_by_uuid( 

269 self.engine.context, self._db_action.uuid, eager=True) 

270 try: 

271 if action_object.state == objects.action.State.ONGOING: 271 ↛ 285line 271 didn't jump to line 285 because the condition on line 271 was always true

272 action_object.state = objects.action.State.CANCELLING 

273 action_object.save() 

274 notifications.action.send_cancel_notification( 

275 self.engine.context, action_object, 

276 fields.NotificationAction.CANCEL, 

277 fields.NotificationPhase.START) 

278 action_object = self.abort() 

279 

280 notifications.action.send_cancel_notification( 

281 self.engine.context, action_object, 

282 fields.NotificationAction.CANCEL, 

283 fields.NotificationPhase.END) 

284 

285 if action_object.state == objects.action.State.PENDING: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 notifications.action.send_cancel_notification( 

287 self.engine.context, action_object, 

288 fields.NotificationAction.CANCEL, 

289 fields.NotificationPhase.START) 

290 action_object.state = objects.action.State.CANCELLED 

291 action_object.save() 

292 notifications.action.send_cancel_notification( 

293 self.engine.context, action_object, 

294 fields.NotificationAction.CANCEL, 

295 fields.NotificationPhase.END) 

296 

297 except Exception as e: 

298 LOG.exception(e) 

299 action_object.state = objects.action.State.FAILED 

300 action_object.save() 

301 notifications.action.send_cancel_notification( 

302 self.engine.context, action_object, 

303 fields.NotificationAction.CANCEL, 

304 fields.NotificationPhase.ERROR, 

305 priority=fields.NotificationPriority.ERROR) 

306 

307 def abort(self, *args, **kwargs): 

308 return self.do_abort(*args, **kwargs)