Coverage for watcher/decision_engine/strategy/context/base.py: 100%
13 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2015 b<>com
3#
4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
5# Vincent FRANCOISE <vincent.francoise@b-com.com>
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16# implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
20import abc
22from watcher import notifications
23from watcher.objects import fields
26class StrategyContext(object, metaclass=abc.ABCMeta):
28 def execute_strategy(self, audit, request_context):
29 """Execute the strategy for the given an audit
31 :param audit: Audit object
32 :type audit: :py:class:`~.objects.audit.Audit` instance
33 :param request_context: Current request context
34 :type request_context: :py:class:`~.RequestContext` instance
35 :returns: The computed solution
36 :rtype: :py:class:`~.BaseSolution` instance
37 """
38 try:
39 notifications.audit.send_action_notification(
40 request_context, audit,
41 action=fields.NotificationAction.STRATEGY,
42 phase=fields.NotificationPhase.START)
43 solution = self.do_execute_strategy(audit, request_context)
44 notifications.audit.send_action_notification(
45 request_context, audit,
46 action=fields.NotificationAction.STRATEGY,
47 phase=fields.NotificationPhase.END)
48 return solution
49 except Exception:
50 notifications.audit.send_action_notification(
51 request_context, audit,
52 action=fields.NotificationAction.STRATEGY,
53 priority=fields.NotificationPriority.ERROR,
54 phase=fields.NotificationPhase.ERROR)
55 raise
57 @abc.abstractmethod
58 def do_execute_strategy(self, audit, request_context):
59 """Execute the strategy for the given an audit
61 :param audit: Audit object
62 :type audit: :py:class:`~.objects.audit.Audit` instance
63 :param request_context: Current request context
64 :type request_context: :py:class:`~.RequestContext` instance
65 :returns: The computed solution
66 :rtype: :py:class:`~.BaseSolution` instance
67 """
68 raise NotImplementedError()