Coverage for watcher/decision_engine/audit/base.py: 97%

77 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# 

4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com> 

5# Alexander Chadin <a.chadin@servionica.ru> 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

16# implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# 

20import abc 

21 

22from oslo_config import cfg 

23from oslo_log import log 

24 

25from watcher.applier import rpcapi 

26from watcher.common import exception 

27from watcher.common import service 

28from watcher.decision_engine.loading import default as loader 

29from watcher.decision_engine.strategy.context import default as default_context 

30from watcher import notifications 

31from watcher import objects 

32from watcher.objects import fields 

33 

34CONF = cfg.CONF 

35LOG = log.getLogger(__name__) 

36 

37 

38class BaseMetaClass(service.Singleton, abc.ABCMeta): 

39 pass 

40 

41 

42class BaseAuditHandler(object, metaclass=BaseMetaClass): 

43 

44 @abc.abstractmethod 

45 def execute(self, audit, request_context): 

46 raise NotImplementedError() 

47 

48 @abc.abstractmethod 

49 def pre_execute(self, audit, request_context): 

50 raise NotImplementedError() 

51 

52 @abc.abstractmethod 

53 def do_execute(self, audit, request_context): 

54 raise NotImplementedError() 

55 

56 @abc.abstractmethod 

57 def post_execute(self, audit, solution, request_context): 

58 raise NotImplementedError() 

59 

60 

61class AuditHandler(BaseAuditHandler, metaclass=abc.ABCMeta): 

62 

63 def __init__(self): 

64 super(AuditHandler, self).__init__() 

65 self._strategy_context = default_context.DefaultStrategyContext() 

66 self._planner_loader = loader.DefaultPlannerLoader() 

67 self.applier_client = rpcapi.ApplierAPI() 

68 

69 def get_planner(self, solution): 

70 # because AuditHandler is a singletone we need to avoid race condition. 

71 # thus we need to load planner every time 

72 planner_name = solution.strategy.planner 

73 LOG.debug("Loading %s", planner_name) 

74 planner = self._planner_loader.load(name=planner_name) 

75 return planner 

76 

77 @property 

78 def strategy_context(self): 

79 return self._strategy_context 

80 

81 def do_execute(self, audit, request_context): 

82 # execute the strategy 

83 solution = self.strategy_context.execute_strategy( 

84 audit, request_context) 

85 

86 return solution 

87 

88 def do_schedule(self, request_context, audit, solution): 

89 try: 

90 notifications.audit.send_action_notification( 

91 request_context, audit, 

92 action=fields.NotificationAction.PLANNER, 

93 phase=fields.NotificationPhase.START) 

94 planner = self.get_planner(solution) 

95 action_plan = planner.schedule(request_context, audit.id, solution) 

96 notifications.audit.send_action_notification( 

97 request_context, audit, 

98 action=fields.NotificationAction.PLANNER, 

99 phase=fields.NotificationPhase.END) 

100 return action_plan 

101 except Exception: 

102 notifications.audit.send_action_notification( 

103 request_context, audit, 

104 action=fields.NotificationAction.PLANNER, 

105 priority=fields.NotificationPriority.ERROR, 

106 phase=fields.NotificationPhase.ERROR) 

107 raise 

108 

109 @staticmethod 

110 def update_audit_state(audit, state): 

111 if audit.state != state: 111 ↛ exitline 111 didn't return from function 'update_audit_state' because the condition on line 111 was always true

112 LOG.debug("Update audit state: %s", state) 

113 audit.state = state 

114 audit.save() 

115 

116 @staticmethod 

117 def check_ongoing_action_plans(request_context): 

118 a_plan_filters = {'state': objects.action_plan.State.ONGOING} 

119 ongoing_action_plans = objects.ActionPlan.list( 

120 request_context, filters=a_plan_filters) 

121 if ongoing_action_plans: 

122 raise exception.ActionPlanIsOngoing( 

123 action_plan=ongoing_action_plans[0].uuid) 

124 

125 def pre_execute(self, audit, request_context): 

126 LOG.debug("Trigger audit %s", audit.uuid) 

127 # If audit.force is true, audit will be executed 

128 # despite of ongoing actionplan 

129 if not audit.force: 

130 self.check_ongoing_action_plans(request_context) 

131 # Write hostname that will execute this audit. 

132 audit.hostname = CONF.host 

133 # change state of the audit to ONGOING 

134 self.update_audit_state(audit, objects.audit.State.ONGOING) 

135 

136 def post_execute(self, audit, solution, request_context): 

137 action_plan = self.do_schedule(request_context, audit, solution) 

138 if audit.auto_trigger: 

139 self.applier_client.launch_action_plan(request_context, 

140 action_plan.uuid) 

141 

142 def execute(self, audit, request_context): 

143 try: 

144 self.pre_execute(audit, request_context) 

145 solution = self.do_execute(audit, request_context) 

146 self.post_execute(audit, solution, request_context) 

147 except exception.ActionPlanIsOngoing as e: 

148 LOG.warning(e) 

149 if audit.audit_type == objects.audit.AuditType.ONESHOT.value: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 self.update_audit_state(audit, objects.audit.State.CANCELLED) 

151 except Exception as e: 

152 LOG.exception(e) 

153 self.update_audit_state(audit, objects.audit.State.FAILED)