Coverage for watcher/api/controllers/v1/webhooks.py: 94%
29 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
13"""
14Webhook endpoint for Watcher v1 REST API.
15"""
17from http import HTTPStatus
18from oslo_log import log
19import pecan
20from pecan import rest
21from wsme import types as wtypes
22import wsmeext.pecan as wsme_pecan
24from watcher.api.controllers.v1 import types
25from watcher.api.controllers.v1 import utils
26from watcher.common import exception
27from watcher.decision_engine import rpcapi
28from watcher import objects
30LOG = log.getLogger(__name__)
33class WebhookController(rest.RestController):
34 """REST controller for webhooks resource."""
36 def __init__(self):
37 super(WebhookController, self).__init__()
38 self.dc_client = rpcapi.DecisionEngineAPI()
40 @wsme_pecan.wsexpose(None, wtypes.text, body=types.jsontype,
41 status_code=HTTPStatus.ACCEPTED)
42 def post(self, audit_ident, body):
43 """Trigger the given audit.
45 :param audit_ident: UUID or name of an audit.
46 """
48 LOG.debug("Webhook trigger Audit: %s.", audit_ident)
50 context = pecan.request.context
51 audit = utils.get_resource('Audit', audit_ident)
52 if audit is None: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise exception.AuditNotFound(audit=audit_ident)
54 if audit.audit_type != objects.audit.AuditType.EVENT.value:
55 raise exception.AuditTypeNotAllowed(audit_type=audit.audit_type)
56 allowed_state = (
57 objects.audit.State.PENDING,
58 objects.audit.State.SUCCEEDED,
59 )
60 if audit.state not in allowed_state:
61 raise exception.AuditStateNotAllowed(state=audit.state)
63 # trigger decision-engine to run the audit
64 self.dc_client.trigger_audit(context, audit.uuid)