Coverage for watcher/api/controllers/v1/audit.py: 85%
392 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright 2013 Red Hat, Inc.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
18"""
19In the Watcher system, an :ref:`Audit <audit_definition>` is a request for
20optimizing a :ref:`Cluster <cluster_definition>`.
22The optimization is done in order to satisfy one :ref:`Goal <goal_definition>`
23on a given :ref:`Cluster <cluster_definition>`.
25For each :ref:`Audit <audit_definition>`, the Watcher system generates an
26:ref:`Action Plan <action_plan_definition>`.
28To see the life-cycle and description of an :ref:`Audit <audit_definition>`
29states, visit :ref:`the Audit State machine <audit_state_machine>`.
30"""
32import datetime
33from dateutil import tz
35from http import HTTPStatus
36import jsonschema
37from oslo_log import log
38from oslo_utils import timeutils
39import pecan
40from pecan import rest
41import wsme
42from wsme import types as wtypes
43from wsme import utils as wutils
44import wsmeext.pecan as wsme_pecan
46from watcher._i18n import _
47from watcher.api.controllers import base
48from watcher.api.controllers import link
49from watcher.api.controllers.v1 import collection
50from watcher.api.controllers.v1 import types
51from watcher.api.controllers.v1 import utils as api_utils
52from watcher.common import exception
53from watcher.common import policy
54from watcher.common import utils
55from watcher.decision_engine import rpcapi
56from watcher import objects
58LOG = log.getLogger(__name__)
61def _get_object_by_value(context, class_name, value):
62 if utils.is_uuid_like(value) or utils.is_int_like(value):
63 return class_name.get(context, value)
64 else:
65 return class_name.get_by_name(context, value)
68def hide_fields_in_newer_versions(obj):
69 """This method hides fields that were added in newer API versions.
71 Certain node fields were introduced at certain API versions.
72 These fields are only made available when the request's API version
73 matches or exceeds the versions when these fields were introduced.
74 """
75 if not api_utils.allow_start_end_audit_time():
76 obj.start_time = wtypes.Unset
77 obj.end_time = wtypes.Unset
78 if not api_utils.allow_force():
79 obj.force = wtypes.Unset
82class AuditPostType(wtypes.Base):
84 name = wtypes.wsattr(wtypes.text, mandatory=False)
86 audit_template_uuid = wtypes.wsattr(types.uuid, mandatory=False)
88 goal = wtypes.wsattr(wtypes.text, mandatory=False)
90 strategy = wtypes.wsattr(wtypes.text, mandatory=False)
92 audit_type = wtypes.wsattr(wtypes.text, mandatory=True)
94 state = wtypes.wsattr(wtypes.text, readonly=True,
95 default=objects.audit.State.PENDING)
97 parameters = wtypes.wsattr({wtypes.text: types.jsontype}, mandatory=False,
98 default={})
99 interval = wtypes.wsattr(types.interval_or_cron, mandatory=False)
101 scope = wtypes.wsattr(types.jsontype, readonly=True)
103 auto_trigger = wtypes.wsattr(bool, mandatory=False)
105 hostname = wtypes.wsattr(wtypes.text, readonly=True, mandatory=False)
107 start_time = wtypes.wsattr(datetime.datetime, mandatory=False)
109 end_time = wtypes.wsattr(datetime.datetime, mandatory=False)
111 force = wtypes.wsattr(bool, mandatory=False)
113 def as_audit(self, context):
114 audit_type_values = [val.value for val in objects.audit.AuditType]
115 if self.audit_type not in audit_type_values: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 raise exception.AuditTypeNotFound(audit_type=self.audit_type)
118 if not self.audit_template_uuid and not self.goal:
119 message = _(
120 'A valid goal or audit_template_id must be provided')
121 raise exception.Invalid(message)
123 if (self.audit_type == objects.audit.AuditType.ONESHOT.value and
124 self.interval not in (wtypes.Unset, None)):
125 raise exception.AuditIntervalNotAllowed(audit_type=self.audit_type)
127 if (self.audit_type == objects.audit.AuditType.CONTINUOUS.value and
128 self.interval in (wtypes.Unset, None)):
129 raise exception.AuditIntervalNotSpecified(
130 audit_type=self.audit_type)
132 if self.audit_template_uuid and self.goal:
133 raise exception.Invalid('Either audit_template_uuid '
134 'or goal should be provided.')
136 if (self.audit_type == objects.audit.AuditType.ONESHOT.value and 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was never true
137 (self.start_time not in (wtypes.Unset, None) or
138 self.end_time not in (wtypes.Unset, None))):
139 raise exception.AuditStartEndTimeNotAllowed(
140 audit_type=self.audit_type)
142 if not api_utils.allow_start_end_audit_time():
143 for field in ('start_time', 'end_time'):
144 if getattr(self, field) not in (wtypes.Unset, None):
145 raise exception.NotAcceptable()
147 # If audit_template_uuid was provided, we will provide any
148 # variables not included in the request, but not override
149 # those variables that were included.
150 if self.audit_template_uuid:
151 try:
152 audit_template = objects.AuditTemplate.get(
153 context, self.audit_template_uuid)
154 except exception.AuditTemplateNotFound:
155 raise exception.Invalid(
156 message=_('The audit template UUID or name specified is '
157 'invalid'))
158 at2a = {
159 'goal': 'goal_id',
160 'strategy': 'strategy_id',
161 'scope': 'scope',
162 }
163 to_string_fields = set(['goal', 'strategy'])
164 for k in at2a:
165 if not getattr(self, k): 165 ↛ 164line 165 didn't jump to line 164 because the condition on line 165 was always true
166 try:
167 at_attr = getattr(audit_template, at2a[k])
168 if at_attr and (k in to_string_fields):
169 at_attr = str(at_attr)
170 setattr(self, k, at_attr)
171 except AttributeError:
172 pass
174 # Note: If audit name was not provided, used a default name
175 if not self.name: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 if self.strategy:
177 strategy = _get_object_by_value(context, objects.Strategy,
178 self.strategy)
179 self.name = "%s-%s" % (strategy.name,
180 timeutils.utcnow().isoformat())
181 elif self.audit_template_uuid:
182 audit_template = objects.AuditTemplate.get(
183 context, self.audit_template_uuid)
184 self.name = "%s-%s" % (audit_template.name,
185 timeutils.utcnow().isoformat())
186 else:
187 goal = _get_object_by_value(context, objects.Goal, self.goal)
188 self.name = "%s-%s" % (goal.name,
189 timeutils.utcnow().isoformat())
190 # No more than 63 characters
191 if len(self.name) > 63:
192 LOG.warning("Audit: %s length exceeds 63 characters",
193 self.name)
194 self.name = self.name[0:63]
196 return Audit(
197 name=self.name,
198 audit_type=self.audit_type,
199 parameters=self.parameters,
200 goal_id=self.goal,
201 strategy_id=self.strategy,
202 interval=self.interval,
203 scope=self.scope,
204 auto_trigger=self.auto_trigger,
205 start_time=self.start_time,
206 end_time=self.end_time,
207 force=self.force)
210class AuditPatchType(types.JsonPatchType):
212 @staticmethod
213 def mandatory_attrs():
214 return ['/audit_template_uuid', '/type']
216 @staticmethod
217 def validate(patch):
219 def is_new_state_none(p):
220 return p.path == '/state' and p.op == 'replace' and p.value is None
222 serialized_patch = {'path': patch.path,
223 'op': patch.op,
224 'value': patch.value}
225 if (patch.path in AuditPatchType.mandatory_attrs() or 225 ↛ 227line 225 didn't jump to line 227 because the condition on line 225 was never true
226 is_new_state_none(patch)):
227 msg = _("%(field)s can't be updated.")
228 raise exception.PatchError(
229 patch=serialized_patch,
230 reason=msg % dict(field=patch.path))
231 return types.JsonPatchType.validate(patch)
234class Audit(base.APIBase):
235 """API representation of an audit.
237 This class enforces type checking and value constraints, and converts
238 between the internal object model and the API representation of an audit.
239 """
240 _goal_uuid = None
241 _goal_name = None
242 _strategy_uuid = None
243 _strategy_name = None
245 def _get_goal(self, value):
246 if value == wtypes.Unset: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return None
248 goal = None
249 try:
250 if utils.is_uuid_like(value) or utils.is_int_like(value):
251 goal = objects.Goal.get(
252 pecan.request.context, value)
253 else:
254 goal = objects.Goal.get_by_name(
255 pecan.request.context, value)
256 except exception.GoalNotFound:
257 pass
258 if goal: 258 ↛ 260line 258 didn't jump to line 260 because the condition on line 258 was always true
259 self.goal_id = goal.id
260 return goal
262 def _get_goal_uuid(self):
263 return self._goal_uuid
265 def _set_goal_uuid(self, value):
266 if value and self._goal_uuid != value:
267 self._goal_uuid = None
268 goal = self._get_goal(value)
269 if goal: 269 ↛ exitline 269 didn't return from function '_set_goal_uuid' because the condition on line 269 was always true
270 self._goal_uuid = goal.uuid
272 def _get_goal_name(self):
273 return self._goal_name
275 def _set_goal_name(self, value):
276 if value and self._goal_name != value:
277 self._goal_name = None
278 goal = self._get_goal(value)
279 if goal: 279 ↛ exitline 279 didn't return from function '_set_goal_name' because the condition on line 279 was always true
280 self._goal_name = goal.name
282 def _get_strategy(self, value):
283 if value == wtypes.Unset: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 return None
285 strategy = None
286 try:
287 if utils.is_uuid_like(value) or utils.is_int_like(value): 287 ↛ 291line 287 didn't jump to line 291 because the condition on line 287 was always true
288 strategy = objects.Strategy.get(
289 pecan.request.context, value)
290 else:
291 strategy = objects.Strategy.get_by_name(
292 pecan.request.context, value)
293 except exception.StrategyNotFound:
294 pass
295 if strategy: 295 ↛ 297line 295 didn't jump to line 297 because the condition on line 295 was always true
296 self.strategy_id = strategy.id
297 return strategy
299 def _get_strategy_uuid(self):
300 return self._strategy_uuid
302 def _set_strategy_uuid(self, value):
303 if value and self._strategy_uuid != value:
304 self._strategy_uuid = None
305 strategy = self._get_strategy(value)
306 if strategy: 306 ↛ exitline 306 didn't return from function '_set_strategy_uuid' because the condition on line 306 was always true
307 self._strategy_uuid = strategy.uuid
309 def _get_strategy_name(self):
310 return self._strategy_name
312 def _set_strategy_name(self, value):
313 if value and self._strategy_name != value:
314 self._strategy_name = None
315 strategy = self._get_strategy(value)
316 if strategy: 316 ↛ exitline 316 didn't return from function '_set_strategy_name' because the condition on line 316 was always true
317 self._strategy_name = strategy.name
319 uuid = types.uuid
320 """Unique UUID for this audit"""
322 name = wtypes.text
323 """Name of this audit"""
325 audit_type = wtypes.text
326 """Type of this audit"""
328 state = wtypes.text
329 """This audit state"""
331 goal_uuid = wtypes.wsproperty(
332 wtypes.text, _get_goal_uuid, _set_goal_uuid, mandatory=True)
333 """Goal UUID the audit refers to"""
335 goal_name = wtypes.wsproperty(
336 wtypes.text, _get_goal_name, _set_goal_name, mandatory=False)
337 """The name of the goal this audit refers to"""
339 strategy_uuid = wtypes.wsproperty(
340 wtypes.text, _get_strategy_uuid, _set_strategy_uuid, mandatory=False)
341 """Strategy UUID the audit refers to"""
343 strategy_name = wtypes.wsproperty(
344 wtypes.text, _get_strategy_name, _set_strategy_name, mandatory=False)
345 """The name of the strategy this audit refers to"""
347 parameters = {wtypes.text: types.jsontype}
348 """The strategy parameters for this audit"""
350 links = wtypes.wsattr([link.Link], readonly=True)
351 """A list containing a self link and associated audit links"""
353 interval = wtypes.wsattr(wtypes.text, mandatory=False)
354 """Launch audit periodically (in seconds)"""
356 scope = wtypes.wsattr(types.jsontype, mandatory=False)
357 """Audit Scope"""
359 auto_trigger = wtypes.wsattr(bool, mandatory=False, default=False)
360 """Autoexecute action plan once audit is succeeded"""
362 next_run_time = wtypes.wsattr(datetime.datetime, mandatory=False)
363 """The next time audit launch"""
365 hostname = wtypes.wsattr(wtypes.text, mandatory=False)
366 """Hostname the audit is running on"""
368 start_time = wtypes.wsattr(datetime.datetime, mandatory=False)
369 """The start time for continuous audit launch"""
371 end_time = wtypes.wsattr(datetime.datetime, mandatory=False)
372 """The end time that stopping continuous audit"""
374 force = wsme.wsattr(bool, mandatory=False, default=False)
375 """Allow Action Plan of this Audit be executed in parallel
376 with other Action Plan"""
378 def __init__(self, **kwargs):
379 self.fields = []
380 fields = list(objects.Audit.fields)
381 for k in fields:
382 # Skip fields we do not expose.
383 if not hasattr(self, k):
384 continue
385 self.fields.append(k)
386 setattr(self, k, kwargs.get(k, wtypes.Unset))
388 self.fields.append('goal_id')
389 self.fields.append('strategy_id')
390 fields.append('goal_uuid')
391 setattr(self, 'goal_uuid', kwargs.get('goal_id',
392 wtypes.Unset))
393 fields.append('goal_name')
394 setattr(self, 'goal_name', kwargs.get('goal_id',
395 wtypes.Unset))
396 fields.append('strategy_uuid')
397 setattr(self, 'strategy_uuid', kwargs.get('strategy_id',
398 wtypes.Unset))
399 fields.append('strategy_name')
400 setattr(self, 'strategy_name', kwargs.get('strategy_id',
401 wtypes.Unset))
403 @staticmethod
404 def _convert_with_links(audit, url, expand=True):
405 if not expand:
406 audit.unset_fields_except(['uuid', 'name', 'audit_type', 'state',
407 'goal_uuid', 'interval', 'scope',
408 'strategy_uuid', 'goal_name',
409 'strategy_name', 'auto_trigger',
410 'next_run_time'])
412 audit.links = [link.Link.make_link('self', url,
413 'audits', audit.uuid),
414 link.Link.make_link('bookmark', url,
415 'audits', audit.uuid,
416 bookmark=True)
417 ]
419 return audit
421 @classmethod
422 def convert_with_links(cls, rpc_audit, expand=True):
423 audit = Audit(**rpc_audit.as_dict())
424 hide_fields_in_newer_versions(audit)
425 return cls._convert_with_links(audit, pecan.request.host_url, expand)
427 @classmethod
428 def sample(cls, expand=True):
429 sample = cls(uuid='27e3153e-d5bf-4b7e-b517-fb518e17f34c',
430 name='My Audit',
431 audit_type='ONESHOT',
432 state='PENDING',
433 created_at=timeutils.utcnow(),
434 deleted_at=None,
435 updated_at=timeutils.utcnow(),
436 interval='7200',
437 scope=[],
438 auto_trigger=False,
439 next_run_time=timeutils.utcnow(),
440 start_time=timeutils.utcnow(),
441 end_time=timeutils.utcnow())
443 sample.goal_id = '7ae81bb3-dec3-4289-8d6c-da80bd8001ae'
444 sample.strategy_id = '7ae81bb3-dec3-4289-8d6c-da80bd8001ff'
446 return cls._convert_with_links(sample, 'http://localhost:9322', expand)
449class AuditCollection(collection.Collection):
450 """API representation of a collection of audits."""
452 audits = [Audit]
453 """A list containing audits objects"""
455 def __init__(self, **kwargs):
456 super(AuditCollection, self).__init__()
457 self._type = 'audits'
459 @staticmethod
460 def convert_with_links(rpc_audits, limit, url=None, expand=False,
461 **kwargs):
462 collection = AuditCollection()
463 collection.audits = [Audit.convert_with_links(p, expand)
464 for p in rpc_audits]
465 collection.next = collection.get_next(limit, url=url, **kwargs)
466 return collection
468 @classmethod
469 def sample(cls):
470 sample = cls()
471 sample.audits = [Audit.sample(expand=False)]
472 return sample
475class AuditsController(rest.RestController):
476 """REST controller for Audits."""
478 def __init__(self):
479 super(AuditsController, self).__init__()
480 self.dc_client = rpcapi.DecisionEngineAPI()
482 from_audits = False
483 """A flag to indicate if the requests to this controller are coming
484 from the top-level resource Audits."""
486 _custom_actions = {
487 'detail': ['GET'],
488 }
490 def _get_audits_collection(self, marker, limit,
491 sort_key, sort_dir, expand=False,
492 resource_url=None, goal=None,
493 strategy=None):
494 additional_fields = ["goal_uuid", "goal_name", "strategy_uuid",
495 "strategy_name"]
497 api_utils.validate_sort_key(
498 sort_key, list(objects.Audit.fields) + additional_fields)
499 limit = api_utils.validate_limit(limit)
500 api_utils.validate_sort_dir(sort_dir)
502 marker_obj = None
503 if marker: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 marker_obj = objects.Audit.get_by_uuid(pecan.request.context,
505 marker)
507 filters = {}
508 if goal: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true
509 if utils.is_uuid_like(goal):
510 filters['goal_uuid'] = goal
511 else:
512 # TODO(michaelgugino): add method to get goal by name.
513 filters['goal_name'] = goal
515 if strategy: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 if utils.is_uuid_like(strategy):
517 filters['strategy_uuid'] = strategy
518 else:
519 # TODO(michaelgugino): add method to get goal by name.
520 filters['strategy_name'] = strategy
522 need_api_sort = api_utils.check_need_api_sort(sort_key,
523 additional_fields)
524 sort_db_key = (sort_key if not need_api_sort
525 else None)
527 audits = objects.Audit.list(pecan.request.context,
528 limit,
529 marker_obj, sort_key=sort_db_key,
530 sort_dir=sort_dir, filters=filters)
532 audits_collection = AuditCollection.convert_with_links(
533 audits, limit, url=resource_url, expand=expand,
534 sort_key=sort_key, sort_dir=sort_dir)
536 if need_api_sort:
537 api_utils.make_api_sort(audits_collection.audits, sort_key,
538 sort_dir)
540 return audits_collection
542 @wsme_pecan.wsexpose(AuditCollection, types.uuid, int, wtypes.text,
543 wtypes.text, wtypes.text, wtypes.text)
544 def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc',
545 goal=None, strategy=None):
546 """Retrieve a list of audits.
548 :param marker: pagination marker for large data sets.
549 :param limit: maximum number of resources to return in a single result.
550 :param sort_key: column to sort results by. Default: id.
551 :param sort_dir: direction to sort. "asc" or "desc". Default: asc.
552 :param goal: goal UUID or name to filter by
553 :param strategy: strategy UUID or name to filter by
554 """
556 context = pecan.request.context
557 policy.enforce(context, 'audit:get_all',
558 action='audit:get_all')
560 return self._get_audits_collection(marker, limit, sort_key,
561 sort_dir, goal=goal,
562 strategy=strategy)
564 @wsme_pecan.wsexpose(AuditCollection, wtypes.text, types.uuid, int,
565 wtypes.text, wtypes.text)
566 def detail(self, goal=None, marker=None, limit=None,
567 sort_key='id', sort_dir='asc'):
568 """Retrieve a list of audits with detail.
570 :param goal: goal UUID or name to filter by
571 :param marker: pagination marker for large data sets.
572 :param limit: maximum number of resources to return in a single result.
573 :param sort_key: column to sort results by. Default: id.
574 :param sort_dir: direction to sort. "asc" or "desc". Default: asc.
575 """
576 context = pecan.request.context
577 policy.enforce(context, 'audit:detail',
578 action='audit:detail')
579 # NOTE(lucasagomes): /detail should only work against collections
580 parent = pecan.request.path.split('/')[:-1][-1]
581 if parent != "audits":
582 raise exception.HTTPNotFound
584 expand = True
585 resource_url = '/'.join(['audits', 'detail'])
586 return self._get_audits_collection(marker, limit,
587 sort_key, sort_dir, expand,
588 resource_url,
589 goal=goal)
591 @wsme_pecan.wsexpose(Audit, wtypes.text)
592 def get_one(self, audit):
593 """Retrieve information about the given audit.
595 :param audit: UUID or name of an audit.
596 """
597 if self.from_audits: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 raise exception.OperationNotPermitted
600 context = pecan.request.context
601 rpc_audit = api_utils.get_resource('Audit', audit)
602 policy.enforce(context, 'audit:get', rpc_audit, action='audit:get')
604 return Audit.convert_with_links(rpc_audit)
606 @wsme_pecan.wsexpose(Audit, body=AuditPostType,
607 status_code=HTTPStatus.CREATED)
608 def post(self, audit_p):
609 """Create a new audit.
611 :param audit_p: an audit within the request body.
612 """
613 context = pecan.request.context
614 policy.enforce(context, 'audit:create',
615 action='audit:create')
616 audit = audit_p.as_audit(context)
618 if self.from_audits: 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true
619 raise exception.OperationNotPermitted
621 strategy_uuid = audit.strategy_uuid
622 no_schema = True
623 if strategy_uuid is not None:
624 # validate parameter when predefined strategy in audit template
625 strategy = objects.Strategy.get(pecan.request.context,
626 strategy_uuid)
627 schema = strategy.parameters_spec
628 if schema: 628 ↛ 638line 628 didn't jump to line 638 because the condition on line 628 was always true
629 # validate input parameter with default value feedback
630 no_schema = False
631 try:
632 utils.StrictDefaultValidatingDraft4Validator(
633 schema).validate(audit.parameters)
634 except jsonschema.exceptions.ValidationError as e:
635 raise exception.Invalid(
636 _('Invalid parameters for strategy: %s') % e)
638 if no_schema and audit.parameters:
639 raise exception.Invalid(_('Specify parameters but no predefined '
640 'strategy for audit, or no '
641 'parameter spec in predefined strategy'))
643 audit_dict = audit.as_dict()
644 # convert local time to UTC time
645 start_time_value = audit_dict.get('start_time')
646 end_time_value = audit_dict.get('end_time')
647 if start_time_value:
648 audit_dict['start_time'] = start_time_value.replace(
649 tzinfo=tz.tzlocal()).astimezone(
650 tz.tzutc()).replace(tzinfo=None)
651 if end_time_value:
652 audit_dict['end_time'] = end_time_value.replace(
653 tzinfo=tz.tzlocal()).astimezone(
654 tz.tzutc()).replace(tzinfo=None)
656 new_audit = objects.Audit(context, **audit_dict)
657 new_audit.create()
659 # Set the HTTP Location Header
660 pecan.response.location = link.build_url('audits', new_audit.uuid)
662 # trigger decision-engine to run the audit
663 if new_audit.audit_type == objects.audit.AuditType.ONESHOT.value:
664 self.dc_client.trigger_audit(context, new_audit.uuid)
666 return Audit.convert_with_links(new_audit)
668 @wsme.validate(types.uuid, [AuditPatchType])
669 @wsme_pecan.wsexpose(Audit, wtypes.text, body=[AuditPatchType])
670 def patch(self, audit, patch):
671 """Update an existing audit.
673 :param audit: UUID or name of an audit.
674 :param patch: a json PATCH document to apply to this audit.
675 """
676 if self.from_audits: 676 ↛ 677line 676 didn't jump to line 677 because the condition on line 676 was never true
677 raise exception.OperationNotPermitted
679 context = pecan.request.context
680 audit_to_update = api_utils.get_resource(
681 'Audit', audit, eager=True)
682 policy.enforce(context, 'audit:update', audit_to_update,
683 action='audit:update')
685 try:
686 audit_dict = audit_to_update.as_dict()
688 initial_state = audit_dict['state']
689 new_state = api_utils.get_patch_value(patch, 'state')
690 if not api_utils.check_audit_state_transition(
691 patch, initial_state):
692 error_message = _("State transition not allowed: "
693 "(%(initial_state)s -> %(new_state)s)")
694 raise exception.PatchError(
695 patch=patch,
696 reason=error_message % dict(
697 initial_state=initial_state, new_state=new_state))
699 patch_path = api_utils.get_patch_key(patch, 'path')
700 if patch_path in ('start_time', 'end_time'): 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 patch_value = api_utils.get_patch_value(patch, patch_path)
702 # convert string format to UTC time
703 new_patch_value = wutils.parse_isodatetime(
704 patch_value).replace(
705 tzinfo=tz.tzlocal()).astimezone(
706 tz.tzutc()).replace(tzinfo=None)
707 api_utils.set_patch_value(patch, patch_path, new_patch_value)
709 audit = Audit(**api_utils.apply_jsonpatch(audit_dict, patch))
710 except api_utils.JSONPATCH_EXCEPTIONS as e:
711 raise exception.PatchError(patch=patch, reason=e)
713 # Update only the fields that have changed
714 for field in objects.Audit.fields:
715 try:
716 patch_val = getattr(audit, field)
717 except AttributeError:
718 # Ignore fields that aren't exposed in the API
719 continue
720 if patch_val == wtypes.Unset:
721 patch_val = None
722 if audit_to_update[field] != patch_val:
723 audit_to_update[field] = patch_val
725 audit_to_update.save()
726 return Audit.convert_with_links(audit_to_update)
728 @wsme_pecan.wsexpose(None, wtypes.text, status_code=HTTPStatus.NO_CONTENT)
729 def delete(self, audit):
730 """Delete an audit.
732 :param audit: UUID or name of an audit.
733 """
734 context = pecan.request.context
735 audit_to_delete = api_utils.get_resource(
736 'Audit', audit, eager=True)
737 policy.enforce(context, 'audit:delete', audit_to_delete,
738 action='audit:delete')
740 initial_state = audit_to_delete.state
741 new_state = objects.audit.State.DELETED
742 if not objects.audit.AuditStateTransitionManager(
743 ).check_transition(initial_state, new_state):
744 raise exception.DeleteError(
745 state=initial_state)
747 audit_to_delete.soft_delete()