Coverage for watcher/api/controllers/v1/audit_template.py: 88%

371 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright 2013 Red Hat, Inc. 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

14# implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18""" 

19An :ref:`Audit <audit_definition>` may be launched several times with the same 

20settings (:ref:`Goal <goal_definition>`, thresholds, ...). Therefore it makes 

21sense to save those settings in some sort of Audit preset object, which is 

22known as an :ref:`Audit Template <audit_template_definition>`. 

23 

24An :ref:`Audit Template <audit_template_definition>` contains at least the 

25:ref:`Goal <goal_definition>` of the :ref:`Audit <audit_definition>`. 

26 

27It may also contain some error handling settings indicating whether: 

28 

29- :ref:`Watcher Applier <watcher_applier_definition>` stops the 

30 entire operation 

31- :ref:`Watcher Applier <watcher_applier_definition>` performs a rollback 

32 

33and how many retries should be attempted before failure occurs (also the latter 

34can be complex: for example the scenario in which there are many first-time 

35failures on ultimately successful :ref:`Actions <action_definition>`). 

36 

37Moreover, an :ref:`Audit Template <audit_template_definition>` may contain some 

38settings related to the level of automation for the 

39:ref:`Action Plan <action_plan_definition>` that will be generated by the 

40:ref:`Audit <audit_definition>`. 

41A flag will indicate whether the :ref:`Action Plan <action_plan_definition>` 

42will be launched automatically or will need a manual confirmation from the 

43:ref:`Administrator <administrator_definition>`. 

44""" 

45 

46from http import HTTPStatus 

47from oslo_utils import timeutils 

48import pecan 

49from pecan import rest 

50import wsme 

51from wsme import types as wtypes 

52import wsmeext.pecan as wsme_pecan 

53 

54from watcher._i18n import _ 

55from watcher.api.controllers import base 

56from watcher.api.controllers import link 

57from watcher.api.controllers.v1 import collection 

58from watcher.api.controllers.v1 import types 

59from watcher.api.controllers.v1 import utils as api_utils 

60from watcher.common import context as context_utils 

61from watcher.common import exception 

62from watcher.common import policy 

63from watcher.common import utils as common_utils 

64from watcher.decision_engine.loading import default as default_loading 

65from watcher import objects 

66 

67 

68def hide_fields_in_newer_versions(obj): 

69 """This method hides fields that were added in newer API versions. 

70 

71 Certain node fields were introduced at certain API versions. 

72 These fields are only made available when the request's API version 

73 matches or exceeds the versions when these fields were introduced. 

74 """ 

75 pass 

76 

77 

78class AuditTemplatePostType(wtypes.Base): 

79 _ctx = context_utils.make_context() 

80 

81 name = wtypes.wsattr(wtypes.text, mandatory=True) 

82 """Name of this audit template""" 

83 

84 description = wtypes.wsattr(wtypes.text, mandatory=False) 

85 """Short description of this audit template""" 

86 

87 goal = wtypes.wsattr(wtypes.text, mandatory=True) 

88 """Goal UUID or name of the audit template""" 

89 

90 strategy = wtypes.wsattr(wtypes.text, mandatory=False) 

91 """Strategy UUID or name of the audit template""" 

92 

93 scope = wtypes.wsattr(types.jsontype, mandatory=False, default=[]) 

94 """Audit Scope""" 

95 

96 def as_audit_template(self): 

97 return AuditTemplate( 

98 name=self.name, 

99 description=self.description, 

100 goal_id=self.goal, # Dirty trick ... 

101 goal=self.goal, 

102 strategy_id=self.strategy, # Dirty trick ... 

103 strategy_uuid=self.strategy, 

104 scope=self.scope, 

105 ) 

106 

107 @staticmethod 

108 def _build_schema(): 

109 SCHEMA = { 

110 "$schema": "http://json-schema.org/draft-04/schema#", 

111 "type": "array", 

112 "items": { 

113 "type": "object", 

114 "properties": AuditTemplatePostType._get_schemas(), 

115 "additionalProperties": False 

116 } 

117 } 

118 return SCHEMA 

119 

120 @staticmethod 

121 def _get_schemas(): 

122 collectors = default_loading.ClusterDataModelCollectorLoader( 

123 ).list_available() 

124 schemas = {k: c.SCHEMA for k, c 

125 in collectors.items() if hasattr(c, "SCHEMA")} 

126 return schemas 

127 

128 @staticmethod 

129 def validate(audit_template): 

130 available_goals = objects.Goal.list(AuditTemplatePostType._ctx) 

131 available_goal_uuids_map = {g.uuid: g for g in available_goals} 

132 available_goal_names_map = {g.name: g for g in available_goals} 

133 if audit_template.goal in available_goal_uuids_map: 

134 goal = available_goal_uuids_map[audit_template.goal] 

135 elif audit_template.goal in available_goal_names_map: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 goal = available_goal_names_map[audit_template.goal] 

137 else: 

138 raise exception.InvalidGoal(goal=audit_template.goal) 

139 

140 if audit_template.scope: 

141 keys = [list(s)[0] for s in audit_template.scope] 

142 if keys[0] not in ('compute', 'storage'): 

143 audit_template.scope = [dict(compute=audit_template.scope)] 

144 common_utils.Draft4Validator( 

145 AuditTemplatePostType._build_schema() 

146 ).validate(audit_template.scope) 

147 

148 include_host_aggregates = False 

149 exclude_host_aggregates = False 

150 for rule in audit_template.scope[0]['compute']: 

151 if 'host_aggregates' in rule: 

152 include_host_aggregates = True 

153 elif 'exclude' in rule: 

154 for resource in rule['exclude']: 

155 if 'host_aggregates' in resource: 

156 exclude_host_aggregates = True 

157 if include_host_aggregates and exclude_host_aggregates: 

158 raise exception.Invalid( 

159 message=_( 

160 "host_aggregates can't be " 

161 "included and excluded together")) 

162 

163 if audit_template.strategy: 

164 try: 

165 if (common_utils.is_uuid_like(audit_template.strategy) or 

166 common_utils.is_int_like(audit_template.strategy)): 

167 strategy = objects.Strategy.get( 

168 AuditTemplatePostType._ctx, audit_template.strategy) 

169 else: 

170 strategy = objects.Strategy.get_by_name( 

171 AuditTemplatePostType._ctx, audit_template.strategy) 

172 except Exception: 

173 raise exception.InvalidStrategy( 

174 strategy=audit_template.strategy) 

175 

176 # Check that the strategy we indicate is actually related to the 

177 # specified goal 

178 if strategy.goal_id != goal.id: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 available_strategies = objects.Strategy.list( 

180 AuditTemplatePostType._ctx) 

181 choices = ["'%s' (%s)" % (s.uuid, s.name) 

182 for s in available_strategies] 

183 raise exception.InvalidStrategy( 

184 message=_( 

185 "'%(strategy)s' strategy does relate to the " 

186 "'%(goal)s' goal. Possible choices: %(choices)s") 

187 % dict(strategy=strategy.name, goal=goal.name, 

188 choices=", ".join(choices))) 

189 audit_template.strategy = strategy.uuid 

190 

191 # We force the UUID so that we do not need to query the DB with the 

192 # name afterwards 

193 audit_template.goal = goal.uuid 

194 

195 return audit_template 

196 

197 

198class AuditTemplatePatchType(types.JsonPatchType): 

199 

200 _ctx = context_utils.make_context() 

201 

202 @staticmethod 

203 def mandatory_attrs(): 

204 return [] 

205 

206 @staticmethod 

207 def validate(patch): 

208 if patch.path == "/goal" and patch.op != "remove": 

209 AuditTemplatePatchType._validate_goal(patch) 

210 elif patch.path == "/goal" and patch.op == "remove": 

211 raise exception.OperationNotPermitted( 

212 _("Cannot remove 'goal' attribute " 

213 "from an audit template")) 

214 if patch.path == "/strategy": 

215 AuditTemplatePatchType._validate_strategy(patch) 

216 return types.JsonPatchType.validate(patch) 

217 

218 @staticmethod 

219 def _validate_goal(patch): 

220 patch.path = "/goal_id" 

221 goal = patch.value 

222 

223 if goal: 223 ↛ exitline 223 didn't return from function '_validate_goal' because the condition on line 223 was always true

224 available_goals = objects.Goal.list( 

225 AuditTemplatePatchType._ctx) 

226 available_goal_uuids_map = {g.uuid: g for g in available_goals} 

227 available_goal_names_map = {g.name: g for g in available_goals} 

228 if goal in available_goal_uuids_map: 

229 patch.value = available_goal_uuids_map[goal].id 

230 elif goal in available_goal_names_map: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 patch.value = available_goal_names_map[goal].id 

232 else: 

233 raise exception.InvalidGoal(goal=goal) 

234 

235 @staticmethod 

236 def _validate_strategy(patch): 

237 patch.path = "/strategy_id" 

238 strategy = patch.value 

239 if strategy: 

240 available_strategies = objects.Strategy.list( 

241 AuditTemplatePatchType._ctx) 

242 available_strategy_uuids_map = { 

243 s.uuid: s for s in available_strategies} 

244 available_strategy_names_map = { 

245 s.name: s for s in available_strategies} 

246 if strategy in available_strategy_uuids_map: 

247 patch.value = available_strategy_uuids_map[strategy].id 

248 elif strategy in available_strategy_names_map: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 patch.value = available_strategy_names_map[strategy].id 

250 else: 

251 raise exception.InvalidStrategy(strategy=strategy) 

252 

253 

254class AuditTemplate(base.APIBase): 

255 """API representation of a audit template. 

256 

257 This class enforces type checking and value constraints, and converts 

258 between the internal object model and the API representation of an 

259 audit template. 

260 """ 

261 

262 _goal_uuid = None 

263 _goal_name = None 

264 

265 _strategy_uuid = None 

266 _strategy_name = None 

267 

268 def _get_goal(self, value): 

269 if value == wtypes.Unset: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 return None 

271 goal = None 

272 try: 

273 if (common_utils.is_uuid_like(value) or 273 ↛ 278line 273 didn't jump to line 278 because the condition on line 273 was always true

274 common_utils.is_int_like(value)): 

275 goal = objects.Goal.get( 

276 pecan.request.context, value) 

277 else: 

278 goal = objects.Goal.get_by_name( 

279 pecan.request.context, value) 

280 except exception.GoalNotFound: 

281 pass 

282 if goal: 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was always true

283 self.goal_id = goal.id 

284 return goal 

285 

286 def _get_strategy(self, value): 

287 if value == wtypes.Unset: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 return None 

289 strategy = None 

290 try: 

291 if (common_utils.is_uuid_like(value) or 291 ↛ 296line 291 didn't jump to line 296 because the condition on line 291 was always true

292 common_utils.is_int_like(value)): 

293 strategy = objects.Strategy.get( 

294 pecan.request.context, value) 

295 else: 

296 strategy = objects.Strategy.get_by_name( 

297 pecan.request.context, value) 

298 except exception.StrategyNotFound: 

299 pass 

300 if strategy: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true

301 self.strategy_id = strategy.id 

302 return strategy 

303 

304 def _get_goal_uuid(self): 

305 return self._goal_uuid 

306 

307 def _set_goal_uuid(self, value): 

308 if value and self._goal_uuid != value: 

309 self._goal_uuid = None 

310 goal = self._get_goal(value) 

311 if goal: 311 ↛ exitline 311 didn't return from function '_set_goal_uuid' because the condition on line 311 was always true

312 self._goal_uuid = goal.uuid 

313 

314 def _get_strategy_uuid(self): 

315 return self._strategy_uuid 

316 

317 def _set_strategy_uuid(self, value): 

318 if value and self._strategy_uuid != value: 

319 self._strategy_uuid = None 

320 strategy = self._get_strategy(value) 

321 if strategy: 321 ↛ exitline 321 didn't return from function '_set_strategy_uuid' because the condition on line 321 was always true

322 self._strategy_uuid = strategy.uuid 

323 

324 def _get_goal_name(self): 

325 return self._goal_name 

326 

327 def _set_goal_name(self, value): 

328 if value and self._goal_name != value: 

329 self._goal_name = None 

330 goal = self._get_goal(value) 

331 if goal: 331 ↛ exitline 331 didn't return from function '_set_goal_name' because the condition on line 331 was always true

332 self._goal_name = goal.name 

333 

334 def _get_strategy_name(self): 

335 return self._strategy_name 

336 

337 def _set_strategy_name(self, value): 

338 if value and self._strategy_name != value: 

339 self._strategy_name = None 

340 strategy = self._get_strategy(value) 

341 if strategy: 341 ↛ exitline 341 didn't return from function '_set_strategy_name' because the condition on line 341 was always true

342 self._strategy_name = strategy.name 

343 

344 uuid = wtypes.wsattr(types.uuid, readonly=True) 

345 """Unique UUID for this audit template""" 

346 

347 name = wtypes.text 

348 """Name of this audit template""" 

349 

350 description = wtypes.wsattr(wtypes.text, mandatory=False) 

351 """Short description of this audit template""" 

352 

353 goal_uuid = wtypes.wsproperty( 

354 wtypes.text, _get_goal_uuid, _set_goal_uuid, mandatory=True) 

355 """Goal UUID the audit template refers to""" 

356 

357 goal_name = wtypes.wsproperty( 

358 wtypes.text, _get_goal_name, _set_goal_name, mandatory=False) 

359 """The name of the goal this audit template refers to""" 

360 

361 strategy_uuid = wtypes.wsproperty( 

362 wtypes.text, _get_strategy_uuid, _set_strategy_uuid, mandatory=False) 

363 """Strategy UUID the audit template refers to""" 

364 

365 strategy_name = wtypes.wsproperty( 

366 wtypes.text, _get_strategy_name, _set_strategy_name, mandatory=False) 

367 """The name of the strategy this audit template refers to""" 

368 

369 audits = wtypes.wsattr([link.Link], readonly=True) 

370 """Links to the collection of audits contained in this audit template""" 

371 

372 links = wtypes.wsattr([link.Link], readonly=True) 

373 """A list containing a self link and associated audit template links""" 

374 

375 scope = wtypes.wsattr(types.jsontype, mandatory=False) 

376 """Audit Scope""" 

377 

378 def __init__(self, **kwargs): 

379 super(AuditTemplate, self).__init__() 

380 self.fields = [] 

381 fields = list(objects.AuditTemplate.fields) 

382 

383 for k in fields: 

384 # Skip fields we do not expose. 

385 if not hasattr(self, k): 

386 continue 

387 self.fields.append(k) 

388 setattr(self, k, kwargs.get(k, wtypes.Unset)) 

389 

390 self.fields.append('goal_id') 

391 self.fields.append('strategy_id') 

392 setattr(self, 'strategy_id', kwargs.get('strategy_id', wtypes.Unset)) 

393 

394 # goal_uuid & strategy_uuid are not part of 

395 # objects.AuditTemplate.fields because they're API-only attributes. 

396 self.fields.append('goal_uuid') 

397 self.fields.append('goal_name') 

398 self.fields.append('strategy_uuid') 

399 self.fields.append('strategy_name') 

400 setattr(self, 'goal_uuid', kwargs.get('goal_id', wtypes.Unset)) 

401 setattr(self, 'goal_name', kwargs.get('goal_id', wtypes.Unset)) 

402 setattr(self, 'strategy_uuid', 

403 kwargs.get('strategy_id', wtypes.Unset)) 

404 setattr(self, 'strategy_name', 

405 kwargs.get('strategy_id', wtypes.Unset)) 

406 

407 @staticmethod 

408 def _convert_with_links(audit_template, url, expand=True): 

409 if not expand: 

410 audit_template.unset_fields_except( 

411 ['uuid', 'name', 'goal_uuid', 'goal_name', 

412 'scope', 'strategy_uuid', 'strategy_name']) 

413 

414 # The numeric ID should not be exposed to 

415 # the user, it's internal only. 

416 audit_template.goal_id = wtypes.Unset 

417 audit_template.strategy_id = wtypes.Unset 

418 

419 audit_template.links = [link.Link.make_link('self', url, 

420 'audit_templates', 

421 audit_template.uuid), 

422 link.Link.make_link('bookmark', url, 

423 'audit_templates', 

424 audit_template.uuid, 

425 bookmark=True)] 

426 return audit_template 

427 

428 @classmethod 

429 def convert_with_links(cls, rpc_audit_template, expand=True): 

430 audit_template = AuditTemplate(**rpc_audit_template.as_dict()) 

431 hide_fields_in_newer_versions(audit_template) 

432 return cls._convert_with_links(audit_template, pecan.request.host_url, 

433 expand) 

434 

435 @classmethod 

436 def sample(cls, expand=True): 

437 sample = cls(uuid='27e3153e-d5bf-4b7e-b517-fb518e17f34c', 

438 name='My Audit Template', 

439 description='Description of my audit template', 

440 goal_uuid='83e44733-b640-40e2-8d8a-7dd3be7134e6', 

441 strategy_uuid='367d826e-b6a4-4b70-bc44-c3f6fe1c9986', 

442 created_at=timeutils.utcnow(), 

443 deleted_at=None, 

444 updated_at=timeutils.utcnow(), 

445 scope=[],) 

446 return cls._convert_with_links(sample, 'http://localhost:9322', expand) 

447 

448 

449class AuditTemplateCollection(collection.Collection): 

450 """API representation of a collection of audit templates.""" 

451 

452 audit_templates = [AuditTemplate] 

453 """A list containing audit templates objects""" 

454 

455 def __init__(self, **kwargs): 

456 super(AuditTemplateCollection, self).__init__() 

457 self._type = 'audit_templates' 

458 

459 @staticmethod 

460 def convert_with_links(rpc_audit_templates, limit, url=None, expand=False, 

461 **kwargs): 

462 at_collection = AuditTemplateCollection() 

463 at_collection.audit_templates = [ 

464 AuditTemplate.convert_with_links(p, expand) 

465 for p in rpc_audit_templates] 

466 at_collection.next = at_collection.get_next(limit, url=url, **kwargs) 

467 return at_collection 

468 

469 @classmethod 

470 def sample(cls): 

471 sample = cls() 

472 sample.audit_templates = [AuditTemplate.sample(expand=False)] 

473 return sample 

474 

475 

476class AuditTemplatesController(rest.RestController): 

477 """REST controller for AuditTemplates.""" 

478 

479 def __init__(self): 

480 super(AuditTemplatesController, self).__init__() 

481 

482 from_audit_templates = False 

483 """A flag to indicate if the requests to this controller are coming 

484 from the top-level resource AuditTemplates.""" 

485 

486 _custom_actions = { 

487 'detail': ['GET'], 

488 } 

489 

490 def _get_audit_templates_collection(self, filters, marker, limit, 

491 sort_key, sort_dir, expand=False, 

492 resource_url=None): 

493 additional_fields = ["goal_uuid", "goal_name", "strategy_uuid", 

494 "strategy_name"] 

495 

496 api_utils.validate_sort_key( 

497 sort_key, list(objects.AuditTemplate.fields) + additional_fields) 

498 api_utils.validate_search_filters( 

499 filters, list(objects.AuditTemplate.fields) + additional_fields) 

500 limit = api_utils.validate_limit(limit) 

501 api_utils.validate_sort_dir(sort_dir) 

502 

503 marker_obj = None 

504 if marker: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 marker_obj = objects.AuditTemplate.get_by_uuid( 

506 pecan.request.context, 

507 marker) 

508 

509 need_api_sort = api_utils.check_need_api_sort(sort_key, 

510 additional_fields) 

511 sort_db_key = (sort_key if not need_api_sort 

512 else None) 

513 

514 audit_templates = objects.AuditTemplate.list( 

515 pecan.request.context, filters, limit, marker_obj, 

516 sort_key=sort_db_key, sort_dir=sort_dir) 

517 

518 audit_templates_collection = \ 

519 AuditTemplateCollection.convert_with_links( 

520 audit_templates, limit, url=resource_url, expand=expand, 

521 sort_key=sort_key, sort_dir=sort_dir) 

522 

523 if need_api_sort: 

524 api_utils.make_api_sort( 

525 audit_templates_collection.audit_templates, sort_key, 

526 sort_dir) 

527 

528 return audit_templates_collection 

529 

530 @wsme_pecan.wsexpose(AuditTemplateCollection, wtypes.text, wtypes.text, 

531 types.uuid, int, wtypes.text, wtypes.text) 

532 def get_all(self, goal=None, strategy=None, marker=None, 

533 limit=None, sort_key='id', sort_dir='asc'): 

534 """Retrieve a list of audit templates. 

535 

536 :param goal: goal UUID or name to filter by 

537 :param strategy: strategy UUID or name to filter by 

538 :param marker: pagination marker for large data sets. 

539 :param limit: maximum number of resources to return in a single result. 

540 :param sort_key: column to sort results by. Default: id. 

541 :param sort_dir: direction to sort. "asc" or "desc". Default: asc. 

542 """ 

543 context = pecan.request.context 

544 policy.enforce(context, 'audit_template:get_all', 

545 action='audit_template:get_all') 

546 filters = {} 

547 if goal: 

548 if common_utils.is_uuid_like(goal): 

549 filters['goal_uuid'] = goal 

550 else: 

551 filters['goal_name'] = goal 

552 

553 if strategy: 

554 if common_utils.is_uuid_like(strategy): 

555 filters['strategy_uuid'] = strategy 

556 else: 

557 filters['strategy_name'] = strategy 

558 

559 return self._get_audit_templates_collection( 

560 filters, marker, limit, sort_key, sort_dir) 

561 

562 @wsme_pecan.wsexpose(AuditTemplateCollection, wtypes.text, wtypes.text, 

563 types.uuid, int, wtypes.text, wtypes.text) 

564 def detail(self, goal=None, strategy=None, marker=None, 

565 limit=None, sort_key='id', sort_dir='asc'): 

566 """Retrieve a list of audit templates with detail. 

567 

568 :param goal: goal UUID or name to filter by 

569 :param strategy: strategy UUID or name to filter by 

570 :param marker: pagination marker for large data sets. 

571 :param limit: maximum number of resources to return in a single result. 

572 :param sort_key: column to sort results by. Default: id. 

573 :param sort_dir: direction to sort. "asc" or "desc". Default: asc. 

574 """ 

575 context = pecan.request.context 

576 policy.enforce(context, 'audit_template:detail', 

577 action='audit_template:detail') 

578 

579 # NOTE(lucasagomes): /detail should only work against collections 

580 parent = pecan.request.path.split('/')[:-1][-1] 

581 if parent != "audit_templates": 

582 raise exception.HTTPNotFound 

583 

584 filters = {} 

585 if goal: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 if common_utils.is_uuid_like(goal): 

587 filters['goal_uuid'] = goal 

588 else: 

589 filters['goal_name'] = goal 

590 

591 if strategy: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 if common_utils.is_uuid_like(strategy): 

593 filters['strategy_uuid'] = strategy 

594 else: 

595 filters['strategy_name'] = strategy 

596 

597 expand = True 

598 resource_url = '/'.join(['audit_templates', 'detail']) 

599 return self._get_audit_templates_collection(filters, marker, limit, 

600 sort_key, sort_dir, expand, 

601 resource_url) 

602 

603 @wsme_pecan.wsexpose(AuditTemplate, wtypes.text) 

604 def get_one(self, audit_template): 

605 """Retrieve information about the given audit template. 

606 

607 :param audit_template: UUID or name of an audit template. 

608 """ 

609 if self.from_audit_templates: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 raise exception.OperationNotPermitted 

611 

612 context = pecan.request.context 

613 rpc_audit_template = api_utils.get_resource('AuditTemplate', 

614 audit_template) 

615 policy.enforce(context, 'audit_template:get', rpc_audit_template, 

616 action='audit_template:get') 

617 

618 return AuditTemplate.convert_with_links(rpc_audit_template) 

619 

620 @wsme.validate(types.uuid, AuditTemplatePostType) 

621 @wsme_pecan.wsexpose(AuditTemplate, body=AuditTemplatePostType, 

622 status_code=HTTPStatus.CREATED) 

623 def post(self, audit_template_postdata): 

624 """Create a new audit template. 

625 

626 :param audit_template_postdata: the audit template POST data 

627 from the request body. 

628 """ 

629 if self.from_audit_templates: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true

630 raise exception.OperationNotPermitted 

631 

632 context = pecan.request.context 

633 policy.enforce(context, 'audit_template:create', 

634 action='audit_template:create') 

635 

636 context = pecan.request.context 

637 audit_template = audit_template_postdata.as_audit_template() 

638 audit_template_dict = audit_template.as_dict() 

639 new_audit_template = objects.AuditTemplate(context, 

640 **audit_template_dict) 

641 new_audit_template.create() 

642 

643 # Set the HTTP Location Header 

644 pecan.response.location = link.build_url( 

645 'audit_templates', new_audit_template.uuid) 

646 return AuditTemplate.convert_with_links(new_audit_template) 

647 

648 @wsme.validate(types.uuid, [AuditTemplatePatchType]) 

649 @wsme_pecan.wsexpose(AuditTemplate, wtypes.text, 

650 body=[AuditTemplatePatchType]) 

651 def patch(self, audit_template, patch): 

652 """Update an existing audit template. 

653 

654 :param template_uuid: UUID of a audit template. 

655 :param patch: a json PATCH document to apply to this audit template. 

656 """ 

657 if self.from_audit_templates: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 raise exception.OperationNotPermitted 

659 

660 context = pecan.request.context 

661 audit_template_to_update = api_utils.get_resource('AuditTemplate', 

662 audit_template) 

663 policy.enforce(context, 'audit_template:update', 

664 audit_template_to_update, 

665 action='audit_template:update') 

666 

667 if common_utils.is_uuid_like(audit_template): 

668 audit_template_to_update = objects.AuditTemplate.get_by_uuid( 

669 pecan.request.context, 

670 audit_template) 

671 else: 

672 audit_template_to_update = objects.AuditTemplate.get_by_name( 

673 pecan.request.context, 

674 audit_template) 

675 

676 try: 

677 audit_template_dict = audit_template_to_update.as_dict() 

678 audit_template = AuditTemplate(**api_utils.apply_jsonpatch( 

679 audit_template_dict, patch)) 

680 except api_utils.JSONPATCH_EXCEPTIONS as e: 

681 raise exception.PatchError(patch=patch, reason=e) 

682 

683 # Update only the fields that have changed 

684 for field in objects.AuditTemplate.fields: 

685 try: 

686 patch_val = getattr(audit_template, field) 

687 except AttributeError: 

688 # Ignore fields that aren't exposed in the API 

689 continue 

690 if patch_val == wtypes.Unset: 

691 patch_val = None 

692 if audit_template_to_update[field] != patch_val: 

693 audit_template_to_update[field] = patch_val 

694 

695 audit_template_to_update.save() 

696 return AuditTemplate.convert_with_links(audit_template_to_update) 

697 

698 @wsme_pecan.wsexpose(None, wtypes.text, status_code=HTTPStatus.NO_CONTENT) 

699 def delete(self, audit_template): 

700 """Delete a audit template. 

701 

702 :param template_uuid: UUID or name of an audit template. 

703 """ 

704 context = pecan.request.context 

705 audit_template_to_delete = api_utils.get_resource('AuditTemplate', 

706 audit_template) 

707 policy.enforce(context, 'audit_template:delete', 

708 audit_template_to_delete, 

709 action='audit_template:delete') 

710 

711 audit_template_to_delete.soft_delete()