Coverage for watcher/api/controllers/v1/audit_template.py: 88%
371 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright 2013 Red Hat, Inc.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
18"""
19An :ref:`Audit <audit_definition>` may be launched several times with the same
20settings (:ref:`Goal <goal_definition>`, thresholds, ...). Therefore it makes
21sense to save those settings in some sort of Audit preset object, which is
22known as an :ref:`Audit Template <audit_template_definition>`.
24An :ref:`Audit Template <audit_template_definition>` contains at least the
25:ref:`Goal <goal_definition>` of the :ref:`Audit <audit_definition>`.
27It may also contain some error handling settings indicating whether:
29- :ref:`Watcher Applier <watcher_applier_definition>` stops the
30 entire operation
31- :ref:`Watcher Applier <watcher_applier_definition>` performs a rollback
33and how many retries should be attempted before failure occurs (also the latter
34can be complex: for example the scenario in which there are many first-time
35failures on ultimately successful :ref:`Actions <action_definition>`).
37Moreover, an :ref:`Audit Template <audit_template_definition>` may contain some
38settings related to the level of automation for the
39:ref:`Action Plan <action_plan_definition>` that will be generated by the
40:ref:`Audit <audit_definition>`.
41A flag will indicate whether the :ref:`Action Plan <action_plan_definition>`
42will be launched automatically or will need a manual confirmation from the
43:ref:`Administrator <administrator_definition>`.
44"""
46from http import HTTPStatus
47from oslo_utils import timeutils
48import pecan
49from pecan import rest
50import wsme
51from wsme import types as wtypes
52import wsmeext.pecan as wsme_pecan
54from watcher._i18n import _
55from watcher.api.controllers import base
56from watcher.api.controllers import link
57from watcher.api.controllers.v1 import collection
58from watcher.api.controllers.v1 import types
59from watcher.api.controllers.v1 import utils as api_utils
60from watcher.common import context as context_utils
61from watcher.common import exception
62from watcher.common import policy
63from watcher.common import utils as common_utils
64from watcher.decision_engine.loading import default as default_loading
65from watcher import objects
68def hide_fields_in_newer_versions(obj):
69 """This method hides fields that were added in newer API versions.
71 Certain node fields were introduced at certain API versions.
72 These fields are only made available when the request's API version
73 matches or exceeds the versions when these fields were introduced.
74 """
75 pass
78class AuditTemplatePostType(wtypes.Base):
79 _ctx = context_utils.make_context()
81 name = wtypes.wsattr(wtypes.text, mandatory=True)
82 """Name of this audit template"""
84 description = wtypes.wsattr(wtypes.text, mandatory=False)
85 """Short description of this audit template"""
87 goal = wtypes.wsattr(wtypes.text, mandatory=True)
88 """Goal UUID or name of the audit template"""
90 strategy = wtypes.wsattr(wtypes.text, mandatory=False)
91 """Strategy UUID or name of the audit template"""
93 scope = wtypes.wsattr(types.jsontype, mandatory=False, default=[])
94 """Audit Scope"""
96 def as_audit_template(self):
97 return AuditTemplate(
98 name=self.name,
99 description=self.description,
100 goal_id=self.goal, # Dirty trick ...
101 goal=self.goal,
102 strategy_id=self.strategy, # Dirty trick ...
103 strategy_uuid=self.strategy,
104 scope=self.scope,
105 )
107 @staticmethod
108 def _build_schema():
109 SCHEMA = {
110 "$schema": "http://json-schema.org/draft-04/schema#",
111 "type": "array",
112 "items": {
113 "type": "object",
114 "properties": AuditTemplatePostType._get_schemas(),
115 "additionalProperties": False
116 }
117 }
118 return SCHEMA
120 @staticmethod
121 def _get_schemas():
122 collectors = default_loading.ClusterDataModelCollectorLoader(
123 ).list_available()
124 schemas = {k: c.SCHEMA for k, c
125 in collectors.items() if hasattr(c, "SCHEMA")}
126 return schemas
128 @staticmethod
129 def validate(audit_template):
130 available_goals = objects.Goal.list(AuditTemplatePostType._ctx)
131 available_goal_uuids_map = {g.uuid: g for g in available_goals}
132 available_goal_names_map = {g.name: g for g in available_goals}
133 if audit_template.goal in available_goal_uuids_map:
134 goal = available_goal_uuids_map[audit_template.goal]
135 elif audit_template.goal in available_goal_names_map: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 goal = available_goal_names_map[audit_template.goal]
137 else:
138 raise exception.InvalidGoal(goal=audit_template.goal)
140 if audit_template.scope:
141 keys = [list(s)[0] for s in audit_template.scope]
142 if keys[0] not in ('compute', 'storage'):
143 audit_template.scope = [dict(compute=audit_template.scope)]
144 common_utils.Draft4Validator(
145 AuditTemplatePostType._build_schema()
146 ).validate(audit_template.scope)
148 include_host_aggregates = False
149 exclude_host_aggregates = False
150 for rule in audit_template.scope[0]['compute']:
151 if 'host_aggregates' in rule:
152 include_host_aggregates = True
153 elif 'exclude' in rule:
154 for resource in rule['exclude']:
155 if 'host_aggregates' in resource:
156 exclude_host_aggregates = True
157 if include_host_aggregates and exclude_host_aggregates:
158 raise exception.Invalid(
159 message=_(
160 "host_aggregates can't be "
161 "included and excluded together"))
163 if audit_template.strategy:
164 try:
165 if (common_utils.is_uuid_like(audit_template.strategy) or
166 common_utils.is_int_like(audit_template.strategy)):
167 strategy = objects.Strategy.get(
168 AuditTemplatePostType._ctx, audit_template.strategy)
169 else:
170 strategy = objects.Strategy.get_by_name(
171 AuditTemplatePostType._ctx, audit_template.strategy)
172 except Exception:
173 raise exception.InvalidStrategy(
174 strategy=audit_template.strategy)
176 # Check that the strategy we indicate is actually related to the
177 # specified goal
178 if strategy.goal_id != goal.id: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 available_strategies = objects.Strategy.list(
180 AuditTemplatePostType._ctx)
181 choices = ["'%s' (%s)" % (s.uuid, s.name)
182 for s in available_strategies]
183 raise exception.InvalidStrategy(
184 message=_(
185 "'%(strategy)s' strategy does relate to the "
186 "'%(goal)s' goal. Possible choices: %(choices)s")
187 % dict(strategy=strategy.name, goal=goal.name,
188 choices=", ".join(choices)))
189 audit_template.strategy = strategy.uuid
191 # We force the UUID so that we do not need to query the DB with the
192 # name afterwards
193 audit_template.goal = goal.uuid
195 return audit_template
198class AuditTemplatePatchType(types.JsonPatchType):
200 _ctx = context_utils.make_context()
202 @staticmethod
203 def mandatory_attrs():
204 return []
206 @staticmethod
207 def validate(patch):
208 if patch.path == "/goal" and patch.op != "remove":
209 AuditTemplatePatchType._validate_goal(patch)
210 elif patch.path == "/goal" and patch.op == "remove":
211 raise exception.OperationNotPermitted(
212 _("Cannot remove 'goal' attribute "
213 "from an audit template"))
214 if patch.path == "/strategy":
215 AuditTemplatePatchType._validate_strategy(patch)
216 return types.JsonPatchType.validate(patch)
218 @staticmethod
219 def _validate_goal(patch):
220 patch.path = "/goal_id"
221 goal = patch.value
223 if goal: 223 ↛ exitline 223 didn't return from function '_validate_goal' because the condition on line 223 was always true
224 available_goals = objects.Goal.list(
225 AuditTemplatePatchType._ctx)
226 available_goal_uuids_map = {g.uuid: g for g in available_goals}
227 available_goal_names_map = {g.name: g for g in available_goals}
228 if goal in available_goal_uuids_map:
229 patch.value = available_goal_uuids_map[goal].id
230 elif goal in available_goal_names_map: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 patch.value = available_goal_names_map[goal].id
232 else:
233 raise exception.InvalidGoal(goal=goal)
235 @staticmethod
236 def _validate_strategy(patch):
237 patch.path = "/strategy_id"
238 strategy = patch.value
239 if strategy:
240 available_strategies = objects.Strategy.list(
241 AuditTemplatePatchType._ctx)
242 available_strategy_uuids_map = {
243 s.uuid: s for s in available_strategies}
244 available_strategy_names_map = {
245 s.name: s for s in available_strategies}
246 if strategy in available_strategy_uuids_map:
247 patch.value = available_strategy_uuids_map[strategy].id
248 elif strategy in available_strategy_names_map: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 patch.value = available_strategy_names_map[strategy].id
250 else:
251 raise exception.InvalidStrategy(strategy=strategy)
254class AuditTemplate(base.APIBase):
255 """API representation of a audit template.
257 This class enforces type checking and value constraints, and converts
258 between the internal object model and the API representation of an
259 audit template.
260 """
262 _goal_uuid = None
263 _goal_name = None
265 _strategy_uuid = None
266 _strategy_name = None
268 def _get_goal(self, value):
269 if value == wtypes.Unset: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 return None
271 goal = None
272 try:
273 if (common_utils.is_uuid_like(value) or 273 ↛ 278line 273 didn't jump to line 278 because the condition on line 273 was always true
274 common_utils.is_int_like(value)):
275 goal = objects.Goal.get(
276 pecan.request.context, value)
277 else:
278 goal = objects.Goal.get_by_name(
279 pecan.request.context, value)
280 except exception.GoalNotFound:
281 pass
282 if goal: 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was always true
283 self.goal_id = goal.id
284 return goal
286 def _get_strategy(self, value):
287 if value == wtypes.Unset: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 return None
289 strategy = None
290 try:
291 if (common_utils.is_uuid_like(value) or 291 ↛ 296line 291 didn't jump to line 296 because the condition on line 291 was always true
292 common_utils.is_int_like(value)):
293 strategy = objects.Strategy.get(
294 pecan.request.context, value)
295 else:
296 strategy = objects.Strategy.get_by_name(
297 pecan.request.context, value)
298 except exception.StrategyNotFound:
299 pass
300 if strategy: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true
301 self.strategy_id = strategy.id
302 return strategy
304 def _get_goal_uuid(self):
305 return self._goal_uuid
307 def _set_goal_uuid(self, value):
308 if value and self._goal_uuid != value:
309 self._goal_uuid = None
310 goal = self._get_goal(value)
311 if goal: 311 ↛ exitline 311 didn't return from function '_set_goal_uuid' because the condition on line 311 was always true
312 self._goal_uuid = goal.uuid
314 def _get_strategy_uuid(self):
315 return self._strategy_uuid
317 def _set_strategy_uuid(self, value):
318 if value and self._strategy_uuid != value:
319 self._strategy_uuid = None
320 strategy = self._get_strategy(value)
321 if strategy: 321 ↛ exitline 321 didn't return from function '_set_strategy_uuid' because the condition on line 321 was always true
322 self._strategy_uuid = strategy.uuid
324 def _get_goal_name(self):
325 return self._goal_name
327 def _set_goal_name(self, value):
328 if value and self._goal_name != value:
329 self._goal_name = None
330 goal = self._get_goal(value)
331 if goal: 331 ↛ exitline 331 didn't return from function '_set_goal_name' because the condition on line 331 was always true
332 self._goal_name = goal.name
334 def _get_strategy_name(self):
335 return self._strategy_name
337 def _set_strategy_name(self, value):
338 if value and self._strategy_name != value:
339 self._strategy_name = None
340 strategy = self._get_strategy(value)
341 if strategy: 341 ↛ exitline 341 didn't return from function '_set_strategy_name' because the condition on line 341 was always true
342 self._strategy_name = strategy.name
344 uuid = wtypes.wsattr(types.uuid, readonly=True)
345 """Unique UUID for this audit template"""
347 name = wtypes.text
348 """Name of this audit template"""
350 description = wtypes.wsattr(wtypes.text, mandatory=False)
351 """Short description of this audit template"""
353 goal_uuid = wtypes.wsproperty(
354 wtypes.text, _get_goal_uuid, _set_goal_uuid, mandatory=True)
355 """Goal UUID the audit template refers to"""
357 goal_name = wtypes.wsproperty(
358 wtypes.text, _get_goal_name, _set_goal_name, mandatory=False)
359 """The name of the goal this audit template refers to"""
361 strategy_uuid = wtypes.wsproperty(
362 wtypes.text, _get_strategy_uuid, _set_strategy_uuid, mandatory=False)
363 """Strategy UUID the audit template refers to"""
365 strategy_name = wtypes.wsproperty(
366 wtypes.text, _get_strategy_name, _set_strategy_name, mandatory=False)
367 """The name of the strategy this audit template refers to"""
369 audits = wtypes.wsattr([link.Link], readonly=True)
370 """Links to the collection of audits contained in this audit template"""
372 links = wtypes.wsattr([link.Link], readonly=True)
373 """A list containing a self link and associated audit template links"""
375 scope = wtypes.wsattr(types.jsontype, mandatory=False)
376 """Audit Scope"""
378 def __init__(self, **kwargs):
379 super(AuditTemplate, self).__init__()
380 self.fields = []
381 fields = list(objects.AuditTemplate.fields)
383 for k in fields:
384 # Skip fields we do not expose.
385 if not hasattr(self, k):
386 continue
387 self.fields.append(k)
388 setattr(self, k, kwargs.get(k, wtypes.Unset))
390 self.fields.append('goal_id')
391 self.fields.append('strategy_id')
392 setattr(self, 'strategy_id', kwargs.get('strategy_id', wtypes.Unset))
394 # goal_uuid & strategy_uuid are not part of
395 # objects.AuditTemplate.fields because they're API-only attributes.
396 self.fields.append('goal_uuid')
397 self.fields.append('goal_name')
398 self.fields.append('strategy_uuid')
399 self.fields.append('strategy_name')
400 setattr(self, 'goal_uuid', kwargs.get('goal_id', wtypes.Unset))
401 setattr(self, 'goal_name', kwargs.get('goal_id', wtypes.Unset))
402 setattr(self, 'strategy_uuid',
403 kwargs.get('strategy_id', wtypes.Unset))
404 setattr(self, 'strategy_name',
405 kwargs.get('strategy_id', wtypes.Unset))
407 @staticmethod
408 def _convert_with_links(audit_template, url, expand=True):
409 if not expand:
410 audit_template.unset_fields_except(
411 ['uuid', 'name', 'goal_uuid', 'goal_name',
412 'scope', 'strategy_uuid', 'strategy_name'])
414 # The numeric ID should not be exposed to
415 # the user, it's internal only.
416 audit_template.goal_id = wtypes.Unset
417 audit_template.strategy_id = wtypes.Unset
419 audit_template.links = [link.Link.make_link('self', url,
420 'audit_templates',
421 audit_template.uuid),
422 link.Link.make_link('bookmark', url,
423 'audit_templates',
424 audit_template.uuid,
425 bookmark=True)]
426 return audit_template
428 @classmethod
429 def convert_with_links(cls, rpc_audit_template, expand=True):
430 audit_template = AuditTemplate(**rpc_audit_template.as_dict())
431 hide_fields_in_newer_versions(audit_template)
432 return cls._convert_with_links(audit_template, pecan.request.host_url,
433 expand)
435 @classmethod
436 def sample(cls, expand=True):
437 sample = cls(uuid='27e3153e-d5bf-4b7e-b517-fb518e17f34c',
438 name='My Audit Template',
439 description='Description of my audit template',
440 goal_uuid='83e44733-b640-40e2-8d8a-7dd3be7134e6',
441 strategy_uuid='367d826e-b6a4-4b70-bc44-c3f6fe1c9986',
442 created_at=timeutils.utcnow(),
443 deleted_at=None,
444 updated_at=timeutils.utcnow(),
445 scope=[],)
446 return cls._convert_with_links(sample, 'http://localhost:9322', expand)
449class AuditTemplateCollection(collection.Collection):
450 """API representation of a collection of audit templates."""
452 audit_templates = [AuditTemplate]
453 """A list containing audit templates objects"""
455 def __init__(self, **kwargs):
456 super(AuditTemplateCollection, self).__init__()
457 self._type = 'audit_templates'
459 @staticmethod
460 def convert_with_links(rpc_audit_templates, limit, url=None, expand=False,
461 **kwargs):
462 at_collection = AuditTemplateCollection()
463 at_collection.audit_templates = [
464 AuditTemplate.convert_with_links(p, expand)
465 for p in rpc_audit_templates]
466 at_collection.next = at_collection.get_next(limit, url=url, **kwargs)
467 return at_collection
469 @classmethod
470 def sample(cls):
471 sample = cls()
472 sample.audit_templates = [AuditTemplate.sample(expand=False)]
473 return sample
476class AuditTemplatesController(rest.RestController):
477 """REST controller for AuditTemplates."""
479 def __init__(self):
480 super(AuditTemplatesController, self).__init__()
482 from_audit_templates = False
483 """A flag to indicate if the requests to this controller are coming
484 from the top-level resource AuditTemplates."""
486 _custom_actions = {
487 'detail': ['GET'],
488 }
490 def _get_audit_templates_collection(self, filters, marker, limit,
491 sort_key, sort_dir, expand=False,
492 resource_url=None):
493 additional_fields = ["goal_uuid", "goal_name", "strategy_uuid",
494 "strategy_name"]
496 api_utils.validate_sort_key(
497 sort_key, list(objects.AuditTemplate.fields) + additional_fields)
498 api_utils.validate_search_filters(
499 filters, list(objects.AuditTemplate.fields) + additional_fields)
500 limit = api_utils.validate_limit(limit)
501 api_utils.validate_sort_dir(sort_dir)
503 marker_obj = None
504 if marker: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 marker_obj = objects.AuditTemplate.get_by_uuid(
506 pecan.request.context,
507 marker)
509 need_api_sort = api_utils.check_need_api_sort(sort_key,
510 additional_fields)
511 sort_db_key = (sort_key if not need_api_sort
512 else None)
514 audit_templates = objects.AuditTemplate.list(
515 pecan.request.context, filters, limit, marker_obj,
516 sort_key=sort_db_key, sort_dir=sort_dir)
518 audit_templates_collection = \
519 AuditTemplateCollection.convert_with_links(
520 audit_templates, limit, url=resource_url, expand=expand,
521 sort_key=sort_key, sort_dir=sort_dir)
523 if need_api_sort:
524 api_utils.make_api_sort(
525 audit_templates_collection.audit_templates, sort_key,
526 sort_dir)
528 return audit_templates_collection
530 @wsme_pecan.wsexpose(AuditTemplateCollection, wtypes.text, wtypes.text,
531 types.uuid, int, wtypes.text, wtypes.text)
532 def get_all(self, goal=None, strategy=None, marker=None,
533 limit=None, sort_key='id', sort_dir='asc'):
534 """Retrieve a list of audit templates.
536 :param goal: goal UUID or name to filter by
537 :param strategy: strategy UUID or name to filter by
538 :param marker: pagination marker for large data sets.
539 :param limit: maximum number of resources to return in a single result.
540 :param sort_key: column to sort results by. Default: id.
541 :param sort_dir: direction to sort. "asc" or "desc". Default: asc.
542 """
543 context = pecan.request.context
544 policy.enforce(context, 'audit_template:get_all',
545 action='audit_template:get_all')
546 filters = {}
547 if goal:
548 if common_utils.is_uuid_like(goal):
549 filters['goal_uuid'] = goal
550 else:
551 filters['goal_name'] = goal
553 if strategy:
554 if common_utils.is_uuid_like(strategy):
555 filters['strategy_uuid'] = strategy
556 else:
557 filters['strategy_name'] = strategy
559 return self._get_audit_templates_collection(
560 filters, marker, limit, sort_key, sort_dir)
562 @wsme_pecan.wsexpose(AuditTemplateCollection, wtypes.text, wtypes.text,
563 types.uuid, int, wtypes.text, wtypes.text)
564 def detail(self, goal=None, strategy=None, marker=None,
565 limit=None, sort_key='id', sort_dir='asc'):
566 """Retrieve a list of audit templates with detail.
568 :param goal: goal UUID or name to filter by
569 :param strategy: strategy UUID or name to filter by
570 :param marker: pagination marker for large data sets.
571 :param limit: maximum number of resources to return in a single result.
572 :param sort_key: column to sort results by. Default: id.
573 :param sort_dir: direction to sort. "asc" or "desc". Default: asc.
574 """
575 context = pecan.request.context
576 policy.enforce(context, 'audit_template:detail',
577 action='audit_template:detail')
579 # NOTE(lucasagomes): /detail should only work against collections
580 parent = pecan.request.path.split('/')[:-1][-1]
581 if parent != "audit_templates":
582 raise exception.HTTPNotFound
584 filters = {}
585 if goal: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 if common_utils.is_uuid_like(goal):
587 filters['goal_uuid'] = goal
588 else:
589 filters['goal_name'] = goal
591 if strategy: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 if common_utils.is_uuid_like(strategy):
593 filters['strategy_uuid'] = strategy
594 else:
595 filters['strategy_name'] = strategy
597 expand = True
598 resource_url = '/'.join(['audit_templates', 'detail'])
599 return self._get_audit_templates_collection(filters, marker, limit,
600 sort_key, sort_dir, expand,
601 resource_url)
603 @wsme_pecan.wsexpose(AuditTemplate, wtypes.text)
604 def get_one(self, audit_template):
605 """Retrieve information about the given audit template.
607 :param audit_template: UUID or name of an audit template.
608 """
609 if self.from_audit_templates: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 raise exception.OperationNotPermitted
612 context = pecan.request.context
613 rpc_audit_template = api_utils.get_resource('AuditTemplate',
614 audit_template)
615 policy.enforce(context, 'audit_template:get', rpc_audit_template,
616 action='audit_template:get')
618 return AuditTemplate.convert_with_links(rpc_audit_template)
620 @wsme.validate(types.uuid, AuditTemplatePostType)
621 @wsme_pecan.wsexpose(AuditTemplate, body=AuditTemplatePostType,
622 status_code=HTTPStatus.CREATED)
623 def post(self, audit_template_postdata):
624 """Create a new audit template.
626 :param audit_template_postdata: the audit template POST data
627 from the request body.
628 """
629 if self.from_audit_templates: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true
630 raise exception.OperationNotPermitted
632 context = pecan.request.context
633 policy.enforce(context, 'audit_template:create',
634 action='audit_template:create')
636 context = pecan.request.context
637 audit_template = audit_template_postdata.as_audit_template()
638 audit_template_dict = audit_template.as_dict()
639 new_audit_template = objects.AuditTemplate(context,
640 **audit_template_dict)
641 new_audit_template.create()
643 # Set the HTTP Location Header
644 pecan.response.location = link.build_url(
645 'audit_templates', new_audit_template.uuid)
646 return AuditTemplate.convert_with_links(new_audit_template)
648 @wsme.validate(types.uuid, [AuditTemplatePatchType])
649 @wsme_pecan.wsexpose(AuditTemplate, wtypes.text,
650 body=[AuditTemplatePatchType])
651 def patch(self, audit_template, patch):
652 """Update an existing audit template.
654 :param template_uuid: UUID of a audit template.
655 :param patch: a json PATCH document to apply to this audit template.
656 """
657 if self.from_audit_templates: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 raise exception.OperationNotPermitted
660 context = pecan.request.context
661 audit_template_to_update = api_utils.get_resource('AuditTemplate',
662 audit_template)
663 policy.enforce(context, 'audit_template:update',
664 audit_template_to_update,
665 action='audit_template:update')
667 if common_utils.is_uuid_like(audit_template):
668 audit_template_to_update = objects.AuditTemplate.get_by_uuid(
669 pecan.request.context,
670 audit_template)
671 else:
672 audit_template_to_update = objects.AuditTemplate.get_by_name(
673 pecan.request.context,
674 audit_template)
676 try:
677 audit_template_dict = audit_template_to_update.as_dict()
678 audit_template = AuditTemplate(**api_utils.apply_jsonpatch(
679 audit_template_dict, patch))
680 except api_utils.JSONPATCH_EXCEPTIONS as e:
681 raise exception.PatchError(patch=patch, reason=e)
683 # Update only the fields that have changed
684 for field in objects.AuditTemplate.fields:
685 try:
686 patch_val = getattr(audit_template, field)
687 except AttributeError:
688 # Ignore fields that aren't exposed in the API
689 continue
690 if patch_val == wtypes.Unset:
691 patch_val = None
692 if audit_template_to_update[field] != patch_val:
693 audit_template_to_update[field] = patch_val
695 audit_template_to_update.save()
696 return AuditTemplate.convert_with_links(audit_template_to_update)
698 @wsme_pecan.wsexpose(None, wtypes.text, status_code=HTTPStatus.NO_CONTENT)
699 def delete(self, audit_template):
700 """Delete a audit template.
702 :param template_uuid: UUID or name of an audit template.
703 """
704 context = pecan.request.context
705 audit_template_to_delete = api_utils.get_resource('AuditTemplate',
706 audit_template)
707 policy.enforce(context, 'audit_template:delete',
708 audit_template_to_delete,
709 action='audit_template:delete')
711 audit_template_to_delete.soft_delete()