Coverage for watcher/decision_engine/strategy/strategies/actuation.py: 93%

28 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 b<>com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17 

18from watcher._i18n import _ 

19from watcher.decision_engine.strategy.strategies import base 

20 

21 

22class Actuator(base.UnclassifiedStrategy): 

23 """Actuator 

24 

25 Actuator that simply executes the actions given as parameter 

26 

27 This strategy allows anyone to create an action plan with a predefined 

28 set of actions. This strategy can be used for 2 different purposes: 

29 

30 - Test actions 

31 - Use this strategy based on an event trigger to perform some explicit task 

32 

33 """ 

34 

35 @classmethod 

36 def get_name(cls): 

37 return "actuator" 

38 

39 @classmethod 

40 def get_display_name(cls): 

41 return _("Actuator") 

42 

43 @classmethod 

44 def get_translatable_display_name(cls): 

45 return "Actuator" 

46 

47 @classmethod 

48 def get_schema(cls): 

49 # Mandatory default setting for each element 

50 return { 

51 "$schema": "http://json-schema.org/draft-04/schema#", 

52 "type": "object", 

53 "properties": { 

54 "actions": { 

55 "type": "array", 

56 "items": { 

57 "type": "object", 

58 "properties": { 

59 "action_type": { 

60 "type": "string" 

61 }, 

62 "resource_id": { 

63 "type": "string" 

64 }, 

65 "input_parameters": { 

66 "type": "object", 

67 "properties": {}, 

68 "additionalProperties": True 

69 } 

70 }, 

71 "required": [ 

72 "action_type", "input_parameters" 

73 ], 

74 "additionalProperties": True, 

75 } 

76 } 

77 }, 

78 "required": [ 

79 "actions" 

80 ] 

81 } 

82 

83 @classmethod 

84 def get_config_opts(cls): 

85 """Override base class config options as do not use datasource """ 

86 

87 return [] 

88 

89 @property 

90 def actions(self): 

91 return self.input_parameters.get('actions', []) 

92 

93 def pre_execute(self): 

94 self._pre_execute() 

95 

96 def do_execute(self, audit=None): 

97 for action in self.actions: 

98 self.solution.add_action(**action) 

99 

100 def post_execute(self): 

101 pass