Coverage for watcher/decision_engine/strategy/strategies/actuation.py: 93%
28 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2017 b<>com
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
18from watcher._i18n import _
19from watcher.decision_engine.strategy.strategies import base
22class Actuator(base.UnclassifiedStrategy):
23 """Actuator
25 Actuator that simply executes the actions given as parameter
27 This strategy allows anyone to create an action plan with a predefined
28 set of actions. This strategy can be used for 2 different purposes:
30 - Test actions
31 - Use this strategy based on an event trigger to perform some explicit task
33 """
35 @classmethod
36 def get_name(cls):
37 return "actuator"
39 @classmethod
40 def get_display_name(cls):
41 return _("Actuator")
43 @classmethod
44 def get_translatable_display_name(cls):
45 return "Actuator"
47 @classmethod
48 def get_schema(cls):
49 # Mandatory default setting for each element
50 return {
51 "$schema": "http://json-schema.org/draft-04/schema#",
52 "type": "object",
53 "properties": {
54 "actions": {
55 "type": "array",
56 "items": {
57 "type": "object",
58 "properties": {
59 "action_type": {
60 "type": "string"
61 },
62 "resource_id": {
63 "type": "string"
64 },
65 "input_parameters": {
66 "type": "object",
67 "properties": {},
68 "additionalProperties": True
69 }
70 },
71 "required": [
72 "action_type", "input_parameters"
73 ],
74 "additionalProperties": True,
75 }
76 }
77 },
78 "required": [
79 "actions"
80 ]
81 }
83 @classmethod
84 def get_config_opts(cls):
85 """Override base class config options as do not use datasource """
87 return []
89 @property
90 def actions(self):
91 return self.input_parameters.get('actions', [])
93 def pre_execute(self):
94 self._pre_execute()
96 def do_execute(self, audit=None):
97 for action in self.actions:
98 self.solution.add_action(**action)
100 def post_execute(self):
101 pass