Coverage for watcher/common/utils.py: 52%

89 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17"""Utilities and helper functions.""" 

18 

19import asyncio 

20import datetime 

21import inspect 

22import random 

23import re 

24import string 

25 

26from croniter import croniter 

27import eventlet 

28from eventlet import tpool 

29 

30from jsonschema import validators 

31from oslo_config import cfg 

32from oslo_log import log 

33from oslo_utils import strutils 

34from oslo_utils import uuidutils 

35 

36from watcher.common import exception 

37 

38CONF = cfg.CONF 

39 

40LOG = log.getLogger(__name__) 

41 

42 

43class Struct(dict): 

44 """Specialized dict where you access an item like an attribute 

45 

46 >>> struct = Struct() 

47 >>> struct['a'] = 1 

48 >>> struct.b = 2 

49 >>> assert struct.a == 1 

50 >>> assert struct['b'] == 2 

51 """ 

52 

53 def __getattr__(self, name): 

54 try: 

55 return self[name] 

56 except KeyError: 

57 raise AttributeError(name) 

58 

59 def __setattr__(self, name, value): 

60 try: 

61 self[name] = value 

62 except KeyError: 

63 raise AttributeError(name) 

64 

65 

66generate_uuid = uuidutils.generate_uuid 

67is_uuid_like = uuidutils.is_uuid_like 

68is_int_like = strutils.is_int_like 

69 

70 

71def is_cron_like(value): 

72 """Return True is submitted value is like cron syntax""" 

73 try: 

74 croniter(value, datetime.datetime.now()) 

75 except Exception as e: 

76 raise exception.CronFormatIsInvalid(message=str(e)) 

77 return True 

78 

79 

80def safe_rstrip(value, chars=None): 

81 """Removes trailing characters from a string if that does not make it empty 

82 

83 :param value: A string value that will be stripped. 

84 :param chars: Characters to remove. 

85 :return: Stripped value. 

86 

87 """ 

88 if not isinstance(value, str): 

89 LOG.warning( 

90 "Failed to remove trailing character. Returning original object." 

91 "Supplied object is not a string: %s,", value) 

92 return value 

93 

94 return value.rstrip(chars) or value 

95 

96 

97def is_hostname_safe(hostname): 

98 """Determine if the supplied hostname is RFC compliant. 

99 

100 Check that the supplied hostname conforms to: 

101 * http://en.wikipedia.org/wiki/Hostname 

102 * http://tools.ietf.org/html/rfc952 

103 * http://tools.ietf.org/html/rfc1123 

104 

105 :param hostname: The hostname to be validated. 

106 :returns: True if valid. False if not. 

107 

108 """ 

109 m = r'^[a-z0-9]([a-z0-9\-]{0,61}[a-z0-9])?$' 

110 return (isinstance(hostname, str) and 

111 (re.match(m, hostname) is not None)) 

112 

113 

114def get_cls_import_path(cls): 

115 """Return the import path of a given class""" 

116 module = cls.__module__ 

117 if module is None or module == str.__module__: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 return cls.__name__ 

119 return module + '.' + cls.__name__ 

120 

121 

122# Default value feedback extension as jsonschema doesn't support it 

123def extend_with_default(validator_class): 

124 validate_properties = validator_class.VALIDATORS["properties"] 

125 

126 def set_defaults(validator, properties, instance, schema): 

127 for prop, subschema in properties.items(): 

128 if "default" in subschema and instance is not None: 

129 instance.setdefault(prop, subschema["default"]) 

130 

131 for error in validate_properties( 131 ↛ 134line 131 didn't jump to line 134 because the loop on line 131 never started

132 validator, properties, instance, schema 

133 ): 

134 yield error 

135 

136 return validators.extend(validator_class, 

137 {"properties": set_defaults}) 

138 

139 

140# Parameter strict check extension as jsonschema doesn't support it 

141def extend_with_strict_schema(validator_class): 

142 validate_properties = validator_class.VALIDATORS["properties"] 

143 

144 def strict_schema(validator, properties, instance, schema): 

145 if instance is None: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return 

147 

148 for para in instance.keys(): 

149 if para not in properties.keys(): 

150 raise exception.AuditParameterNotAllowed(parameter=para) 

151 

152 for error in validate_properties( 152 ↛ 155line 152 didn't jump to line 155 because the loop on line 152 never started

153 validator, properties, instance, schema 

154 ): 

155 yield error 

156 

157 return validators.extend(validator_class, {"properties": strict_schema}) 

158 

159 

160StrictDefaultValidatingDraft4Validator = extend_with_default( 

161 extend_with_strict_schema(validators.Draft4Validator)) 

162 

163Draft4Validator = validators.Draft4Validator 

164 

165 

166def random_string(n): 

167 return ''.join([random.choice( 

168 string.ascii_letters + string.digits) for i in range(n)]) 

169 

170 

171# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet. 

172# As a workaround, we're delegating such calls to a native thread. 

173def async_compat_call(f, *args, **kwargs): 

174 timeout = kwargs.pop('timeout', None) 

175 

176 async def async_wrapper(): 

177 ret = f(*args, **kwargs) 

178 if inspect.isawaitable(ret): 

179 return await asyncio.wait_for(ret, timeout) 

180 return ret 

181 

182 def tpool_wrapper(): 

183 # This will run in a separate native thread. Ideally, there should be 

184 # a single thread permanently running an asyncio loop, but for 

185 # convenience we'll use eventlet.tpool, which leverages a thread pool. 

186 # 

187 # That being considered, we're setting up a temporary asyncio loop to 

188 # handle this call. 

189 loop = asyncio.new_event_loop() 

190 try: 

191 asyncio.set_event_loop(loop) 

192 return loop.run_until_complete(async_wrapper()) 

193 finally: 

194 loop.close() 

195 

196 # We'll use eventlet timeouts as an extra precaution and asyncio timeouts 

197 # to avoid lingering threads. For consistency, we'll convert eventlet 

198 # timeout exceptions to asyncio timeout errors. 

199 with eventlet.timeout.Timeout( 

200 seconds=timeout, 

201 exception=asyncio.TimeoutError("Timeout: %ss" % timeout)): 

202 return tpool.execute(tpool_wrapper)