Coverage for watcher/decision_engine/planner/weight.py: 90%

85 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# 

3# Authors: Vincent Francoise <Vincent.FRANCOISE@b-com.com> 

4# Alexander Chadin <a.chadin@servionica.ru> 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

14# implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18import collections 

19 

20import networkx as nx 

21from oslo_config import cfg 

22from oslo_log import log 

23 

24from watcher.common import utils 

25from watcher.decision_engine.planner import base 

26from watcher import objects 

27 

28LOG = log.getLogger(__name__) 

29 

30 

31class WeightPlanner(base.BasePlanner): 

32 """Weight planner implementation 

33 

34 This implementation builds actions with parents in accordance with weights. 

35 Set of actions having a higher weight will be scheduled before 

36 the other ones. There are two config options to configure: 

37 action_weights and parallelization. 

38 

39 *Limitations* 

40 

41 - This planner requires to have action_weights and parallelization configs 

42 tuned well. 

43 """ 

44 

45 def __init__(self, config): 

46 super(WeightPlanner, self).__init__(config) 

47 

48 action_weights = { 

49 'nop': 70, 

50 'volume_migrate': 60, 

51 'change_nova_service_state': 50, 

52 'sleep': 40, 

53 'migrate': 30, 

54 'resize': 20, 

55 'turn_host_to_acpi_s3_state': 10, 

56 'change_node_power_state': 9, 

57 } 

58 

59 parallelization = { 

60 'turn_host_to_acpi_s3_state': 2, 

61 'resize': 2, 

62 'migrate': 2, 

63 'sleep': 1, 

64 'change_nova_service_state': 1, 

65 'nop': 1, 

66 'change_node_power_state': 2, 

67 'volume_migrate': 2 

68 } 

69 

70 @classmethod 

71 def get_config_opts(cls): 

72 return [ 

73 cfg.DictOpt( 

74 'weights', 

75 help="These weights are used to schedule the actions. " 

76 "Action Plan will be build in accordance with sets of " 

77 "actions ordered by descending weights." 

78 "Two action types cannot have the same weight. ", 

79 default=cls.action_weights), 

80 cfg.DictOpt( 

81 'parallelization', 

82 help="Number of actions to be run in parallel on a per " 

83 "action type basis.", 

84 default=cls.parallelization), 

85 ] 

86 

87 @staticmethod 

88 def chunkify(lst, n): 

89 """Yield successive n-sized chunks from lst.""" 

90 n = int(n) 

91 if n < 1: 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was never true

92 # Just to make sure the number is valid 

93 n = 1 

94 

95 # Split a flat list in a list of chunks of size n. 

96 # e.g. chunkify([0, 1, 2, 3, 4], 2) -> [[0, 1], [2, 3], [4]] 

97 for i in range(0, len(lst), n): 

98 yield lst[i:i + n] 

99 

100 def compute_action_graph(self, sorted_weighted_actions): 

101 reverse_weights = {v: k for k, v in self.config.weights.items()} 

102 # leaf_groups contains a list of list of nodes called groups 

103 # each group is a set of nodes from which a future node will 

104 # branch off (parent nodes). 

105 

106 # START --> migrate-1 --> migrate-3 

107 # \ \--> resize-1 --> FINISH 

108 # \--> migrate-2 -------------/ 

109 # In the above case migrate-1 will be the only member of the leaf 

110 # group that migrate-3 will use as parent group, whereas 

111 # resize-1 will have both migrate-2 and migrate-3 in its 

112 # parent/leaf group 

113 leaf_groups = [] 

114 action_graph = nx.DiGraph() 

115 # We iterate through each action type category (sorted by weight) to 

116 # insert them in a Directed Acyclic Graph 

117 for idx, (weight, actions) in enumerate(sorted_weighted_actions): 

118 action_chunks = self.chunkify( 

119 actions, self.config.parallelization[reverse_weights[weight]]) 

120 

121 # We split the actions into chunks/layers that will have to be 

122 # spread across all the available branches of the graph 

123 for chunk_idx, actions_chunk in enumerate(action_chunks): 

124 for action in actions_chunk: 

125 action_graph.add_node(action) 

126 

127 # all other actions 

128 parent_nodes = [] 

129 if not idx and not chunk_idx: 

130 parent_nodes = [] 

131 elif leaf_groups: 131 ↛ 134line 131 didn't jump to line 134 because the condition on line 131 was always true

132 parent_nodes = leaf_groups 

133 

134 for parent_node in parent_nodes: 

135 action_graph.add_edge(parent_node, action) 

136 action.parents.append(parent_node.uuid) 

137 

138 if leaf_groups: 

139 leaf_groups = [] 

140 leaf_groups.extend([a for a in actions_chunk]) 

141 

142 return action_graph 

143 

144 def schedule(self, context, audit_id, solution): 

145 LOG.debug('Creating an action plan for the audit uuid: %s', audit_id) 

146 action_plan = self.create_action_plan(context, audit_id, solution) 

147 

148 sorted_weighted_actions = self.get_sorted_actions_by_weight( 

149 context, action_plan, solution) 

150 action_graph = self.compute_action_graph(sorted_weighted_actions) 

151 

152 self._create_efficacy_indicators( 

153 context, action_plan.id, solution.efficacy_indicators) 

154 

155 if len(action_graph.nodes()) == 0: 

156 LOG.warning("The action plan is empty") 

157 action_plan.state = objects.action_plan.State.SUCCEEDED 

158 action_plan.save() 

159 

160 self.create_scheduled_actions(action_graph) 

161 return action_plan 

162 

163 def get_sorted_actions_by_weight(self, context, action_plan, solution): 

164 # We need to make them immutable to add them to the graph 

165 action_objects = list([ 

166 objects.Action( 

167 context, uuid=utils.generate_uuid(), parents=[], 

168 action_plan_id=action_plan.id, **a) 

169 for a in solution.actions]) 

170 # This is a dict of list with each being a weight and the list being 

171 # all the actions associated to this weight 

172 weighted_actions = collections.defaultdict(list) 

173 for action in action_objects: 

174 action_weight = self.config.weights[action.action_type] 

175 weighted_actions[action_weight].append(action) 

176 

177 return reversed(sorted(weighted_actions.items(), key=lambda x: x[0])) 

178 

179 def create_scheduled_actions(self, graph): 

180 for action in graph.nodes(): 

181 LOG.debug("Creating the %s in the Watcher database", 

182 action.action_type) 

183 try: 

184 action.create() 

185 except Exception as exc: 

186 LOG.exception(exc) 

187 raise 

188 

189 def create_action_plan(self, context, audit_id, solution): 

190 strategy = objects.Strategy.get_by_name( 

191 context, solution.strategy.name) 

192 

193 action_plan_dict = { 

194 'uuid': utils.generate_uuid(), 

195 'audit_id': audit_id, 

196 'strategy_id': strategy.id, 

197 'state': objects.action_plan.State.RECOMMENDED, 

198 'global_efficacy': solution.global_efficacy, 

199 } 

200 

201 new_action_plan = objects.ActionPlan(context, **action_plan_dict) 

202 new_action_plan.create() 

203 

204 return new_action_plan 

205 

206 def _create_efficacy_indicators(self, context, action_plan_id, indicators): 

207 efficacy_indicators = [] 

208 for indicator in indicators: 208 ↛ 209line 208 didn't jump to line 209 because the loop on line 208 never started

209 efficacy_indicator_dict = { 

210 'uuid': utils.generate_uuid(), 

211 'name': indicator.name, 

212 'description': indicator.description, 

213 'unit': indicator.unit, 

214 'value': indicator.value, 

215 'action_plan_id': action_plan_id, 

216 } 

217 new_efficacy_indicator = objects.EfficacyIndicator( 

218 context, **efficacy_indicator_dict) 

219 new_efficacy_indicator.create() 

220 

221 efficacy_indicators.append(new_efficacy_indicator) 

222 return efficacy_indicators