Coverage for watcher/decision_engine/audit/base.py: 97%
77 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2015 b<>com
3#
4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
5# Alexander Chadin <a.chadin@servionica.ru>
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16# implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19#
20import abc
22from oslo_config import cfg
23from oslo_log import log
25from watcher.applier import rpcapi
26from watcher.common import exception
27from watcher.common import service
28from watcher.decision_engine.loading import default as loader
29from watcher.decision_engine.strategy.context import default as default_context
30from watcher import notifications
31from watcher import objects
32from watcher.objects import fields
34CONF = cfg.CONF
35LOG = log.getLogger(__name__)
38class BaseMetaClass(service.Singleton, abc.ABCMeta):
39 pass
42class BaseAuditHandler(object, metaclass=BaseMetaClass):
44 @abc.abstractmethod
45 def execute(self, audit, request_context):
46 raise NotImplementedError()
48 @abc.abstractmethod
49 def pre_execute(self, audit, request_context):
50 raise NotImplementedError()
52 @abc.abstractmethod
53 def do_execute(self, audit, request_context):
54 raise NotImplementedError()
56 @abc.abstractmethod
57 def post_execute(self, audit, solution, request_context):
58 raise NotImplementedError()
61class AuditHandler(BaseAuditHandler, metaclass=abc.ABCMeta):
63 def __init__(self):
64 super(AuditHandler, self).__init__()
65 self._strategy_context = default_context.DefaultStrategyContext()
66 self._planner_loader = loader.DefaultPlannerLoader()
67 self.applier_client = rpcapi.ApplierAPI()
69 def get_planner(self, solution):
70 # because AuditHandler is a singletone we need to avoid race condition.
71 # thus we need to load planner every time
72 planner_name = solution.strategy.planner
73 LOG.debug("Loading %s", planner_name)
74 planner = self._planner_loader.load(name=planner_name)
75 return planner
77 @property
78 def strategy_context(self):
79 return self._strategy_context
81 def do_execute(self, audit, request_context):
82 # execute the strategy
83 solution = self.strategy_context.execute_strategy(
84 audit, request_context)
86 return solution
88 def do_schedule(self, request_context, audit, solution):
89 try:
90 notifications.audit.send_action_notification(
91 request_context, audit,
92 action=fields.NotificationAction.PLANNER,
93 phase=fields.NotificationPhase.START)
94 planner = self.get_planner(solution)
95 action_plan = planner.schedule(request_context, audit.id, solution)
96 notifications.audit.send_action_notification(
97 request_context, audit,
98 action=fields.NotificationAction.PLANNER,
99 phase=fields.NotificationPhase.END)
100 return action_plan
101 except Exception:
102 notifications.audit.send_action_notification(
103 request_context, audit,
104 action=fields.NotificationAction.PLANNER,
105 priority=fields.NotificationPriority.ERROR,
106 phase=fields.NotificationPhase.ERROR)
107 raise
109 @staticmethod
110 def update_audit_state(audit, state):
111 if audit.state != state: 111 ↛ exitline 111 didn't return from function 'update_audit_state' because the condition on line 111 was always true
112 LOG.debug("Update audit state: %s", state)
113 audit.state = state
114 audit.save()
116 @staticmethod
117 def check_ongoing_action_plans(request_context):
118 a_plan_filters = {'state': objects.action_plan.State.ONGOING}
119 ongoing_action_plans = objects.ActionPlan.list(
120 request_context, filters=a_plan_filters)
121 if ongoing_action_plans:
122 raise exception.ActionPlanIsOngoing(
123 action_plan=ongoing_action_plans[0].uuid)
125 def pre_execute(self, audit, request_context):
126 LOG.debug("Trigger audit %s", audit.uuid)
127 # If audit.force is true, audit will be executed
128 # despite of ongoing actionplan
129 if not audit.force:
130 self.check_ongoing_action_plans(request_context)
131 # Write hostname that will execute this audit.
132 audit.hostname = CONF.host
133 # change state of the audit to ONGOING
134 self.update_audit_state(audit, objects.audit.State.ONGOING)
136 def post_execute(self, audit, solution, request_context):
137 action_plan = self.do_schedule(request_context, audit, solution)
138 if audit.auto_trigger:
139 self.applier_client.launch_action_plan(request_context,
140 action_plan.uuid)
142 def execute(self, audit, request_context):
143 try:
144 self.pre_execute(audit, request_context)
145 solution = self.do_execute(audit, request_context)
146 self.post_execute(audit, solution, request_context)
147 except exception.ActionPlanIsOngoing as e:
148 LOG.warning(e)
149 if audit.audit_type == objects.audit.AuditType.ONESHOT.value: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 self.update_audit_state(audit, objects.audit.State.CANCELLED)
151 except Exception as e:
152 LOG.exception(e)
153 self.update_audit_state(audit, objects.audit.State.FAILED)