Coverage for watcher/api/controllers/v1/utils.py: 88%
89 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# Copyright 2013 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from operator import attrgetter
18import jsonpatch
19from oslo_config import cfg
20from oslo_utils import reflection
21from oslo_utils import uuidutils
22import pecan
23import wsme
25from watcher._i18n import _
26from watcher.api.controllers.v1 import versions
27from watcher.common import utils
28from watcher import objects
30CONF = cfg.CONF
33JSONPATCH_EXCEPTIONS = (jsonpatch.JsonPatchException,
34 jsonpatch.JsonPointerException,
35 KeyError)
38def validate_limit(limit):
39 if limit is None:
40 return CONF.api.max_limit
42 if limit <= 0:
43 # Case where we don't a valid limit value
44 raise wsme.exc.ClientSideError(_("Limit must be positive"))
46 if limit and not CONF.api.max_limit:
47 # Case where we don't have an upper limit
48 return limit
50 return min(CONF.api.max_limit, limit)
53def validate_sort_dir(sort_dir):
54 if sort_dir not in ['asc', 'desc']:
55 raise wsme.exc.ClientSideError(_("Invalid sort direction: %s. "
56 "Acceptable values are "
57 "'asc' or 'desc'") % sort_dir)
60def validate_sort_key(sort_key, allowed_fields):
61 # Very lightweight validation for now
62 if sort_key not in allowed_fields:
63 raise wsme.exc.ClientSideError(
64 _("Invalid sort key: %s") % sort_key)
67def validate_search_filters(filters, allowed_fields):
68 # Very lightweight validation for now
69 # todo: improve this (e.g. https://www.parse.com/docs/rest/guide/#queries)
70 for filter_name in filters:
71 if filter_name not in allowed_fields:
72 raise wsme.exc.ClientSideError(
73 _("Invalid filter: %s") % filter_name)
76def check_need_api_sort(sort_key, additional_fields):
77 return sort_key in additional_fields
80def make_api_sort(sorting_list, sort_key, sort_dir):
81 # First sort by uuid field, than sort by sort_key
82 # sort() ensures stable sorting, so we could
83 # make lexicographical sort
84 reverse_direction = (sort_dir == 'desc')
85 sorting_list.sort(key=attrgetter('uuid'), reverse=reverse_direction)
86 sorting_list.sort(key=attrgetter(sort_key), reverse=reverse_direction)
89def apply_jsonpatch(doc, patch):
90 for p in patch:
91 if p['op'] == 'add' and p['path'].count('/') == 1:
92 if p['path'].lstrip('/') not in doc:
93 msg = _('Adding a new attribute (%s) to the root of '
94 ' the resource is not allowed')
95 raise wsme.exc.ClientSideError(msg % p['path'])
96 return jsonpatch.apply_patch(doc, jsonpatch.JsonPatch(patch))
99def get_patch_value(patch, key):
100 for p in patch:
101 if p['op'] == 'replace' and p['path'] == '/%s' % key:
102 return p['value']
105def set_patch_value(patch, key, value):
106 for p in patch:
107 if p['op'] == 'replace' and p['path'] == '/%s' % key:
108 p['value'] = value
111def get_patch_key(patch, key):
112 for p in patch:
113 if p['op'] == 'replace' and key in p.keys():
114 return p[key][1:]
117def check_audit_state_transition(patch, initial):
118 is_transition_valid = True
119 state_value = get_patch_value(patch, "state")
120 if state_value is not None:
121 is_transition_valid = objects.audit.AuditStateTransitionManager(
122 ).check_transition(initial, state_value)
123 return is_transition_valid
126def as_filters_dict(**filters):
127 filters_dict = {}
128 for filter_name, filter_value in filters.items():
129 if filter_value:
130 filters_dict[filter_name] = filter_value
132 return filters_dict
135def get_resource(resource, resource_id, eager=False):
136 """Get the resource from the uuid, id or logical name.
138 :param resource: the resource type.
139 :param resource_id: the UUID, ID or logical name of the resource.
141 :returns: The resource.
142 """
143 resource = getattr(objects, resource)
145 _get = None
146 if utils.is_int_like(resource_id):
147 resource_id = int(resource_id)
148 _get = resource.get
149 elif uuidutils.is_uuid_like(resource_id):
150 _get = resource.get_by_uuid
151 else:
152 _get = resource.get_by_name
154 method_signature = reflection.get_signature(_get)
155 if 'eager' in method_signature.parameters:
156 return _get(pecan.request.context, resource_id, eager=eager)
158 return _get(pecan.request.context, resource_id)
161def allow_start_end_audit_time():
162 """Check if we should support optional start/end attributes for Audit.
164 Version 1.1 of the API added support for start and end time of continuous
165 audits.
166 """
167 return pecan.request.version.minor >= (
168 versions.VERSIONS.MINOR_1_START_END_TIMING.value)
171def allow_force():
172 """Check if we should support optional force attribute for Audit.
174 Version 1.2 of the API added support for forced audits that allows to
175 launch audit when other action plan is ongoing.
176 """
177 return pecan.request.version.minor >= (
178 versions.VERSIONS.MINOR_2_FORCE.value)
181def allow_list_datamodel():
182 """Check if we should support list data model API.
184 Version 1.3 of the API added support to list data model.
185 """
186 return pecan.request.version.minor >= (
187 versions.VERSIONS.MINOR_3_DATAMODEL.value)
190def allow_webhook_api():
191 """Check if we should support webhook API.
193 Version 1.4 of the API added support to trigger webhook.
194 """
195 return pecan.request.version.minor >= (
196 versions.VERSIONS.MINOR_4_WEBHOOK_API.value)