Coverage for watcher/api/controllers/v1/utils.py: 88%

89 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# Copyright 2013 Red Hat, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from operator import attrgetter 

17 

18import jsonpatch 

19from oslo_config import cfg 

20from oslo_utils import reflection 

21from oslo_utils import uuidutils 

22import pecan 

23import wsme 

24 

25from watcher._i18n import _ 

26from watcher.api.controllers.v1 import versions 

27from watcher.common import utils 

28from watcher import objects 

29 

30CONF = cfg.CONF 

31 

32 

33JSONPATCH_EXCEPTIONS = (jsonpatch.JsonPatchException, 

34 jsonpatch.JsonPointerException, 

35 KeyError) 

36 

37 

38def validate_limit(limit): 

39 if limit is None: 

40 return CONF.api.max_limit 

41 

42 if limit <= 0: 

43 # Case where we don't a valid limit value 

44 raise wsme.exc.ClientSideError(_("Limit must be positive")) 

45 

46 if limit and not CONF.api.max_limit: 

47 # Case where we don't have an upper limit 

48 return limit 

49 

50 return min(CONF.api.max_limit, limit) 

51 

52 

53def validate_sort_dir(sort_dir): 

54 if sort_dir not in ['asc', 'desc']: 

55 raise wsme.exc.ClientSideError(_("Invalid sort direction: %s. " 

56 "Acceptable values are " 

57 "'asc' or 'desc'") % sort_dir) 

58 

59 

60def validate_sort_key(sort_key, allowed_fields): 

61 # Very lightweight validation for now 

62 if sort_key not in allowed_fields: 

63 raise wsme.exc.ClientSideError( 

64 _("Invalid sort key: %s") % sort_key) 

65 

66 

67def validate_search_filters(filters, allowed_fields): 

68 # Very lightweight validation for now 

69 # todo: improve this (e.g. https://www.parse.com/docs/rest/guide/#queries) 

70 for filter_name in filters: 

71 if filter_name not in allowed_fields: 

72 raise wsme.exc.ClientSideError( 

73 _("Invalid filter: %s") % filter_name) 

74 

75 

76def check_need_api_sort(sort_key, additional_fields): 

77 return sort_key in additional_fields 

78 

79 

80def make_api_sort(sorting_list, sort_key, sort_dir): 

81 # First sort by uuid field, than sort by sort_key 

82 # sort() ensures stable sorting, so we could 

83 # make lexicographical sort 

84 reverse_direction = (sort_dir == 'desc') 

85 sorting_list.sort(key=attrgetter('uuid'), reverse=reverse_direction) 

86 sorting_list.sort(key=attrgetter(sort_key), reverse=reverse_direction) 

87 

88 

89def apply_jsonpatch(doc, patch): 

90 for p in patch: 

91 if p['op'] == 'add' and p['path'].count('/') == 1: 

92 if p['path'].lstrip('/') not in doc: 

93 msg = _('Adding a new attribute (%s) to the root of ' 

94 ' the resource is not allowed') 

95 raise wsme.exc.ClientSideError(msg % p['path']) 

96 return jsonpatch.apply_patch(doc, jsonpatch.JsonPatch(patch)) 

97 

98 

99def get_patch_value(patch, key): 

100 for p in patch: 

101 if p['op'] == 'replace' and p['path'] == '/%s' % key: 

102 return p['value'] 

103 

104 

105def set_patch_value(patch, key, value): 

106 for p in patch: 

107 if p['op'] == 'replace' and p['path'] == '/%s' % key: 

108 p['value'] = value 

109 

110 

111def get_patch_key(patch, key): 

112 for p in patch: 

113 if p['op'] == 'replace' and key in p.keys(): 

114 return p[key][1:] 

115 

116 

117def check_audit_state_transition(patch, initial): 

118 is_transition_valid = True 

119 state_value = get_patch_value(patch, "state") 

120 if state_value is not None: 

121 is_transition_valid = objects.audit.AuditStateTransitionManager( 

122 ).check_transition(initial, state_value) 

123 return is_transition_valid 

124 

125 

126def as_filters_dict(**filters): 

127 filters_dict = {} 

128 for filter_name, filter_value in filters.items(): 

129 if filter_value: 

130 filters_dict[filter_name] = filter_value 

131 

132 return filters_dict 

133 

134 

135def get_resource(resource, resource_id, eager=False): 

136 """Get the resource from the uuid, id or logical name. 

137 

138 :param resource: the resource type. 

139 :param resource_id: the UUID, ID or logical name of the resource. 

140 

141 :returns: The resource. 

142 """ 

143 resource = getattr(objects, resource) 

144 

145 _get = None 

146 if utils.is_int_like(resource_id): 

147 resource_id = int(resource_id) 

148 _get = resource.get 

149 elif uuidutils.is_uuid_like(resource_id): 

150 _get = resource.get_by_uuid 

151 else: 

152 _get = resource.get_by_name 

153 

154 method_signature = reflection.get_signature(_get) 

155 if 'eager' in method_signature.parameters: 

156 return _get(pecan.request.context, resource_id, eager=eager) 

157 

158 return _get(pecan.request.context, resource_id) 

159 

160 

161def allow_start_end_audit_time(): 

162 """Check if we should support optional start/end attributes for Audit. 

163 

164 Version 1.1 of the API added support for start and end time of continuous 

165 audits. 

166 """ 

167 return pecan.request.version.minor >= ( 

168 versions.VERSIONS.MINOR_1_START_END_TIMING.value) 

169 

170 

171def allow_force(): 

172 """Check if we should support optional force attribute for Audit. 

173 

174 Version 1.2 of the API added support for forced audits that allows to 

175 launch audit when other action plan is ongoing. 

176 """ 

177 return pecan.request.version.minor >= ( 

178 versions.VERSIONS.MINOR_2_FORCE.value) 

179 

180 

181def allow_list_datamodel(): 

182 """Check if we should support list data model API. 

183 

184 Version 1.3 of the API added support to list data model. 

185 """ 

186 return pecan.request.version.minor >= ( 

187 versions.VERSIONS.MINOR_3_DATAMODEL.value) 

188 

189 

190def allow_webhook_api(): 

191 """Check if we should support webhook API. 

192 

193 Version 1.4 of the API added support to trigger webhook. 

194 """ 

195 return pecan.request.version.minor >= ( 

196 versions.VERSIONS.MINOR_4_WEBHOOK_API.value)