Coverage for watcher/decision_engine/strategy/context/default.py: 97%

29 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16from oslo_log import log 

17 

18from watcher.common import clients 

19from watcher.common import utils 

20from watcher.decision_engine.strategy.context import base 

21from watcher.decision_engine.strategy.selection import default 

22 

23from watcher import objects 

24 

25LOG = log.getLogger(__name__) 

26 

27 

28class DefaultStrategyContext(base.StrategyContext): 

29 def __init__(self): 

30 super(DefaultStrategyContext, self).__init__() 

31 LOG.debug("Initializing Strategy Context") 

32 

33 @staticmethod 

34 def select_strategy(audit, request_context): 

35 osc = clients.OpenStackClients() 

36 # todo(jed) retrieve in audit parameters (threshold,...) 

37 # todo(jed) create ActionPlan 

38 

39 goal = objects.Goal.get_by_id(request_context, audit.goal_id) 

40 

41 # NOTE(jed56) In the audit object, the 'strategy_id' attribute 

42 # is optional. If the admin wants to force the trigger of a Strategy 

43 # it could specify the Strategy uuid in the Audit. 

44 strategy_name = None 

45 if audit.strategy_id: 

46 strategy = objects.Strategy.get_by_id( 

47 request_context, audit.strategy_id) 

48 strategy_name = strategy.name 

49 

50 strategy_selector = default.DefaultStrategySelector( 

51 goal_name=goal.name, 

52 strategy_name=strategy_name, 

53 osc=osc) 

54 return strategy_selector.select() 

55 

56 def do_execute_strategy(self, audit, request_context): 

57 selected_strategy = self.select_strategy(audit, request_context) 

58 selected_strategy.audit_scope = audit.scope 

59 

60 schema = selected_strategy.get_schema() 

61 if not audit.parameters and schema: 61 ↛ 66line 61 didn't jump to line 66 because the condition on line 61 was always true

62 # Default value feedback if no predefined strategy 

63 utils.StrictDefaultValidatingDraft4Validator(schema).validate( 

64 audit.parameters) 

65 

66 selected_strategy.input_parameters.update({ 

67 name: value for name, value in audit.parameters.items() 

68 }) 

69 

70 return selected_strategy.execute(audit=audit)