Coverage for watcher/api/controllers/v1/webhooks.py: 94%

29 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# Licensed under the Apache License, Version 2.0 (the "License"); you may 

2# not use this file except in compliance with the License. You may obtain 

3# a copy of the License at 

4# 

5# http://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

10# License for the specific language governing permissions and limitations 

11# under the License. 

12 

13""" 

14Webhook endpoint for Watcher v1 REST API. 

15""" 

16 

17from http import HTTPStatus 

18from oslo_log import log 

19import pecan 

20from pecan import rest 

21from wsme import types as wtypes 

22import wsmeext.pecan as wsme_pecan 

23 

24from watcher.api.controllers.v1 import types 

25from watcher.api.controllers.v1 import utils 

26from watcher.common import exception 

27from watcher.decision_engine import rpcapi 

28from watcher import objects 

29 

30LOG = log.getLogger(__name__) 

31 

32 

33class WebhookController(rest.RestController): 

34 """REST controller for webhooks resource.""" 

35 

36 def __init__(self): 

37 super(WebhookController, self).__init__() 

38 self.dc_client = rpcapi.DecisionEngineAPI() 

39 

40 @wsme_pecan.wsexpose(None, wtypes.text, body=types.jsontype, 

41 status_code=HTTPStatus.ACCEPTED) 

42 def post(self, audit_ident, body): 

43 """Trigger the given audit. 

44 

45 :param audit_ident: UUID or name of an audit. 

46 """ 

47 

48 LOG.debug("Webhook trigger Audit: %s.", audit_ident) 

49 

50 context = pecan.request.context 

51 audit = utils.get_resource('Audit', audit_ident) 

52 if audit is None: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise exception.AuditNotFound(audit=audit_ident) 

54 if audit.audit_type != objects.audit.AuditType.EVENT.value: 

55 raise exception.AuditTypeNotAllowed(audit_type=audit.audit_type) 

56 allowed_state = ( 

57 objects.audit.State.PENDING, 

58 objects.audit.State.SUCCEEDED, 

59 ) 

60 if audit.state not in allowed_state: 

61 raise exception.AuditStateNotAllowed(state=audit.state) 

62 

63 # trigger decision-engine to run the audit 

64 self.dc_client.trigger_audit(context, audit.uuid)