Coverage for watcher/decision_engine/strategy/context/base.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# 

4# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com> 

5# Vincent FRANCOISE <vincent.francoise@b-com.com> 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

16# implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19 

20import abc 

21 

22from watcher import notifications 

23from watcher.objects import fields 

24 

25 

26class StrategyContext(object, metaclass=abc.ABCMeta): 

27 

28 def execute_strategy(self, audit, request_context): 

29 """Execute the strategy for the given an audit 

30 

31 :param audit: Audit object 

32 :type audit: :py:class:`~.objects.audit.Audit` instance 

33 :param request_context: Current request context 

34 :type request_context: :py:class:`~.RequestContext` instance 

35 :returns: The computed solution 

36 :rtype: :py:class:`~.BaseSolution` instance 

37 """ 

38 try: 

39 notifications.audit.send_action_notification( 

40 request_context, audit, 

41 action=fields.NotificationAction.STRATEGY, 

42 phase=fields.NotificationPhase.START) 

43 solution = self.do_execute_strategy(audit, request_context) 

44 notifications.audit.send_action_notification( 

45 request_context, audit, 

46 action=fields.NotificationAction.STRATEGY, 

47 phase=fields.NotificationPhase.END) 

48 return solution 

49 except Exception: 

50 notifications.audit.send_action_notification( 

51 request_context, audit, 

52 action=fields.NotificationAction.STRATEGY, 

53 priority=fields.NotificationPriority.ERROR, 

54 phase=fields.NotificationPhase.ERROR) 

55 raise 

56 

57 @abc.abstractmethod 

58 def do_execute_strategy(self, audit, request_context): 

59 """Execute the strategy for the given an audit 

60 

61 :param audit: Audit object 

62 :type audit: :py:class:`~.objects.audit.Audit` instance 

63 :param request_context: Current request context 

64 :type request_context: :py:class:`~.RequestContext` instance 

65 :returns: The computed solution 

66 :rtype: :py:class:`~.BaseSolution` instance 

67 """ 

68 raise NotImplementedError()