Coverage for watcher/decision_engine/strategy/strategies/node_resource_consolidation.py: 92%

154 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2019 ZTE Corporation 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17 

18from oslo_log import log 

19 

20from watcher._i18n import _ 

21from watcher.common import exception 

22from watcher.decision_engine.model import element 

23from watcher.decision_engine.strategy.strategies import base 

24from watcher import objects 

25 

26LOG = log.getLogger(__name__) 

27 

28 

29class NodeResourceConsolidation(base.ServerConsolidationBaseStrategy): 

30 """consolidating resources on nodes using server migration 

31 

32 *Description* 

33 

34 This strategy checks the resource usages of compute nodes, if the used 

35 resources are less than total, it will try to migrate server to 

36 consolidate the use of resource. 

37 

38 *Requirements* 

39 

40 * You must have at least 2 compute nodes to run 

41 this strategy. 

42 * Hardware: compute nodes should use the same physical CPUs/RAMs 

43 

44 *Limitations* 

45 

46 * This is a proof of concept that is not meant to be used in production 

47 * It assume that live migrations are possible 

48 

49 *Spec URL* 

50 

51 http://specs.openstack.org/openstack/watcher-specs/specs/train/implemented/node-resource-consolidation.html 

52 """ 

53 

54 CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" 

55 REASON_FOR_DISABLE = 'Watcher node resource consolidation strategy' 

56 

57 def __init__(self, config, osc=None): 

58 """node resource consolidation 

59 

60 :param config: A mapping containing the configuration of this strategy 

61 :type config: :py:class:`~.Struct` instance 

62 :param osc: :py:class:`~.OpenStackClients` instance 

63 """ 

64 super(NodeResourceConsolidation, self).__init__(config, osc) 

65 self.host_choice = 'auto' 

66 self.audit = None 

67 self.compute_nodes_count = 0 

68 self.number_of_released_nodes = 0 

69 self.number_of_migrations = 0 

70 

71 @classmethod 

72 def get_name(cls): 

73 return "node_resource_consolidation" 

74 

75 @classmethod 

76 def get_display_name(cls): 

77 return _("Node Resource Consolidation strategy") 

78 

79 @classmethod 

80 def get_translatable_display_name(cls): 

81 return "Node Resource Consolidation strategy" 

82 

83 @classmethod 

84 def get_schema(cls): 

85 # Mandatory default setting for each element 

86 return { 

87 "properties": { 

88 "host_choice": { 

89 "description": "The way to select the server migration " 

90 "destination node. The value 'auto' " 

91 "means that Nova scheduler selects " 

92 "the destination node, and 'specify' " 

93 "means the strategy specifies the " 

94 "destination.", 

95 "type": "string", 

96 "default": 'auto' 

97 }, 

98 }, 

99 } 

100 

101 def check_resources(self, servers, destination): 

102 # check whether a node able to accommodate a VM 

103 dest_flag = False 

104 if not destination: 

105 return dest_flag 

106 free_res = self.compute_model.get_node_free_resources(destination) 

107 for server in servers: 

108 # just vcpu and memory, do not consider disk 

109 if free_res['vcpu'] >= server.vcpus and ( 

110 free_res['memory'] >= server.memory): 

111 free_res['vcpu'] -= server.vcpus 

112 free_res['memory'] -= server.memory 

113 dest_flag = True 

114 servers.remove(server) 

115 

116 return dest_flag 

117 

118 def select_destination(self, server, source, destinations): 

119 dest_node = None 

120 if not destinations: 

121 return dest_node 

122 sorted_nodes = sorted( 

123 destinations, 

124 key=lambda x: self.compute_model.get_node_free_resources( 

125 x)['vcpu']) 

126 for dest in sorted_nodes: 

127 if self.check_resources([server], dest): 

128 if self.compute_model.migrate_instance(server, source, dest): 128 ↛ 126line 128 didn't jump to line 126 because the condition on line 128 was always true

129 dest_node = dest 

130 break 

131 

132 return dest_node 

133 

134 def add_migrate_actions(self, sources, destinations): 

135 if not sources or not destinations: 

136 return 

137 for node in sources: 

138 servers = self.compute_model.get_node_instances(node) 

139 sorted_servers = sorted( 

140 servers, 

141 key=lambda x: x.vcpus, 

142 reverse=True) 

143 for server in sorted_servers: 

144 parameters = {'migration_type': 'live', 

145 'source_node': node.hostname, 

146 'resource_name': server.name} 

147 action_flag = False 

148 if self.host_choice != 'auto': 

149 # specify destination host 

150 dest = self.select_destination(server, node, destinations) 

151 if dest: 

152 parameters['destination_node'] = dest.hostname 

153 action_flag = True 

154 else: 

155 action_flag = True 

156 if action_flag: 

157 self.number_of_migrations += 1 

158 self.solution.add_action( 

159 action_type=self.MIGRATION, 

160 resource_id=server.uuid, 

161 input_parameters=parameters) 

162 

163 def add_change_node_state_actions(self, nodes, status): 

164 if status not in (element.ServiceState.DISABLED.value, 

165 element.ServiceState.ENABLED.value): 

166 raise exception.IllegalArgumentException( 

167 message=_("The node status is not defined")) 

168 changed_nodes = [] 

169 for node in nodes: 

170 if node.status != status: 

171 parameters = {'state': status, 

172 'resource_name': node.hostname} 

173 if status == element.ServiceState.DISABLED.value: 

174 parameters['disabled_reason'] = self.REASON_FOR_DISABLE 

175 self.solution.add_action( 

176 action_type=self.CHANGE_NOVA_SERVICE_STATE, 

177 resource_id=node.uuid, 

178 input_parameters=parameters) 

179 node.status = status 

180 changed_nodes.append(node) 

181 

182 return changed_nodes 

183 

184 def get_nodes_migrate_failed(self): 

185 # check if migration action ever failed 

186 # just for continuous audit 

187 nodes_failed = [] 

188 if self.audit is None or ( 

189 self.audit.audit_type == 

190 objects.audit.AuditType.ONESHOT.value): 

191 return nodes_failed 

192 filters = {'audit_uuid': self.audit.uuid} 

193 actions = objects.action.Action.list( 

194 self.ctx, 

195 filters=filters) 

196 for action in actions: 

197 if action.state == objects.action.State.FAILED and ( 197 ↛ 196line 197 didn't jump to line 196 because the condition on line 197 was always true

198 action.action_type == self.MIGRATION): 

199 server_uuid = action.input_parameters.get('resource_id') 

200 node = self.compute_model.get_node_by_instance_uuid( 

201 server_uuid) 

202 if node not in nodes_failed: 202 ↛ 196line 202 didn't jump to line 196 because the condition on line 202 was always true

203 nodes_failed.append(node) 

204 

205 return nodes_failed 

206 

207 def group_nodes(self, nodes): 

208 free_nodes = [] 

209 source_nodes = [] 

210 dest_nodes = [] 

211 nodes_failed = self.get_nodes_migrate_failed() 

212 LOG.info("nodes: %s migration failed", nodes_failed) 

213 sorted_nodes = sorted( 

214 nodes, 

215 key=lambda x: self.compute_model.get_node_used_resources( 

216 x)['vcpu']) 

217 for node in sorted_nodes: 217 ↛ 249line 217 didn't jump to line 249 because the loop on line 217 didn't complete

218 if node in dest_nodes: 

219 break 

220 # If ever migration failed, do not migrate again 

221 if node in nodes_failed: 

222 # maybe can as the destination node 

223 if node.status == element.ServiceState.ENABLED.value: 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was always true

224 dest_nodes.append(node) 

225 continue 

226 used_resource = self.compute_model.get_node_used_resources(node) 

227 if used_resource['vcpu'] > 0: 

228 servers = self.compute_model.get_node_instances(node) 

229 for dest in reversed(sorted_nodes): 229 ↛ 217line 229 didn't jump to line 217 because the loop on line 229 didn't complete

230 # skip if compute node is disabled 

231 if dest.status == element.ServiceState.DISABLED.value: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 LOG.info("node %s is down", dest.hostname) 

233 continue 

234 if dest in dest_nodes: 

235 continue 

236 if node == dest: 

237 # The last on as destination node 

238 dest_nodes.append(dest) 

239 break 

240 if self.check_resources(servers, dest): 240 ↛ 244line 240 didn't jump to line 244 because the condition on line 240 was always true

241 dest_nodes.append(dest) 

242 if node not in source_nodes: 242 ↛ 244line 242 didn't jump to line 244 because the condition on line 242 was always true

243 source_nodes.append(node) 

244 if not servers: 244 ↛ 229line 244 didn't jump to line 229 because the condition on line 244 was always true

245 break 

246 else: 

247 free_nodes.append(node) 

248 

249 return free_nodes, source_nodes, dest_nodes 

250 

251 def pre_execute(self): 

252 self._pre_execute() 

253 self.host_choice = self.input_parameters.get('host_choice', 'auto') 

254 self.planner = 'node_resource_consolidation' 

255 

256 def do_execute(self, audit=None): 

257 """Strategy execution phase 

258 

259 Executing strategy and creating solution. 

260 """ 

261 self.audit = audit 

262 nodes = list(self.compute_model.get_all_compute_nodes().values()) 

263 free_nodes, source_nodes, dest_nodes = self.group_nodes(nodes) 

264 self.compute_nodes_count = len(nodes) 

265 self.number_of_released_nodes = len(source_nodes) 

266 LOG.info("Free nodes: %s", free_nodes) 

267 LOG.info("Source nodes: %s", source_nodes) 

268 LOG.info("Destination nodes: %s", dest_nodes) 

269 if not source_nodes: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 LOG.info("No compute node needs to be consolidated") 

271 return 

272 nodes_disabled = [] 

273 if self.host_choice == 'auto': 

274 # disable compute node to avoid to be select by Nova scheduler 

275 nodes_disabled = self.add_change_node_state_actions( 

276 free_nodes+source_nodes, element.ServiceState.DISABLED.value) 

277 self.add_migrate_actions(source_nodes, dest_nodes) 

278 if nodes_disabled: 

279 # restore disabled compute node after migration 

280 self.add_change_node_state_actions( 

281 nodes_disabled, element.ServiceState.ENABLED.value) 

282 

283 def post_execute(self): 

284 """Post-execution phase 

285 

286 """ 

287 self.solution.set_efficacy_indicators( 

288 compute_nodes_count=self.compute_nodes_count, 

289 released_compute_nodes_count=self.number_of_released_nodes, 

290 instance_migrations_count=self.number_of_migrations, 

291 )