Coverage for watcher/common/utils.py: 52%
89 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2015 b<>com
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17"""Utilities and helper functions."""
19import asyncio
20import datetime
21import inspect
22import random
23import re
24import string
26from croniter import croniter
27import eventlet
28from eventlet import tpool
30from jsonschema import validators
31from oslo_config import cfg
32from oslo_log import log
33from oslo_utils import strutils
34from oslo_utils import uuidutils
36from watcher.common import exception
38CONF = cfg.CONF
40LOG = log.getLogger(__name__)
43class Struct(dict):
44 """Specialized dict where you access an item like an attribute
46 >>> struct = Struct()
47 >>> struct['a'] = 1
48 >>> struct.b = 2
49 >>> assert struct.a == 1
50 >>> assert struct['b'] == 2
51 """
53 def __getattr__(self, name):
54 try:
55 return self[name]
56 except KeyError:
57 raise AttributeError(name)
59 def __setattr__(self, name, value):
60 try:
61 self[name] = value
62 except KeyError:
63 raise AttributeError(name)
66generate_uuid = uuidutils.generate_uuid
67is_uuid_like = uuidutils.is_uuid_like
68is_int_like = strutils.is_int_like
71def is_cron_like(value):
72 """Return True is submitted value is like cron syntax"""
73 try:
74 croniter(value, datetime.datetime.now())
75 except Exception as e:
76 raise exception.CronFormatIsInvalid(message=str(e))
77 return True
80def safe_rstrip(value, chars=None):
81 """Removes trailing characters from a string if that does not make it empty
83 :param value: A string value that will be stripped.
84 :param chars: Characters to remove.
85 :return: Stripped value.
87 """
88 if not isinstance(value, str):
89 LOG.warning(
90 "Failed to remove trailing character. Returning original object."
91 "Supplied object is not a string: %s,", value)
92 return value
94 return value.rstrip(chars) or value
97def is_hostname_safe(hostname):
98 """Determine if the supplied hostname is RFC compliant.
100 Check that the supplied hostname conforms to:
101 * http://en.wikipedia.org/wiki/Hostname
102 * http://tools.ietf.org/html/rfc952
103 * http://tools.ietf.org/html/rfc1123
105 :param hostname: The hostname to be validated.
106 :returns: True if valid. False if not.
108 """
109 m = r'^[a-z0-9]([a-z0-9\-]{0,61}[a-z0-9])?$'
110 return (isinstance(hostname, str) and
111 (re.match(m, hostname) is not None))
114def get_cls_import_path(cls):
115 """Return the import path of a given class"""
116 module = cls.__module__
117 if module is None or module == str.__module__: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 return cls.__name__
119 return module + '.' + cls.__name__
122# Default value feedback extension as jsonschema doesn't support it
123def extend_with_default(validator_class):
124 validate_properties = validator_class.VALIDATORS["properties"]
126 def set_defaults(validator, properties, instance, schema):
127 for prop, subschema in properties.items():
128 if "default" in subschema and instance is not None:
129 instance.setdefault(prop, subschema["default"])
131 for error in validate_properties( 131 ↛ 134line 131 didn't jump to line 134 because the loop on line 131 never started
132 validator, properties, instance, schema
133 ):
134 yield error
136 return validators.extend(validator_class,
137 {"properties": set_defaults})
140# Parameter strict check extension as jsonschema doesn't support it
141def extend_with_strict_schema(validator_class):
142 validate_properties = validator_class.VALIDATORS["properties"]
144 def strict_schema(validator, properties, instance, schema):
145 if instance is None: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return
148 for para in instance.keys():
149 if para not in properties.keys():
150 raise exception.AuditParameterNotAllowed(parameter=para)
152 for error in validate_properties( 152 ↛ 155line 152 didn't jump to line 155 because the loop on line 152 never started
153 validator, properties, instance, schema
154 ):
155 yield error
157 return validators.extend(validator_class, {"properties": strict_schema})
160StrictDefaultValidatingDraft4Validator = extend_with_default(
161 extend_with_strict_schema(validators.Draft4Validator))
163Draft4Validator = validators.Draft4Validator
166def random_string(n):
167 return ''.join([random.choice(
168 string.ascii_letters + string.digits) for i in range(n)])
171# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
172# As a workaround, we're delegating such calls to a native thread.
173def async_compat_call(f, *args, **kwargs):
174 timeout = kwargs.pop('timeout', None)
176 async def async_wrapper():
177 ret = f(*args, **kwargs)
178 if inspect.isawaitable(ret):
179 return await asyncio.wait_for(ret, timeout)
180 return ret
182 def tpool_wrapper():
183 # This will run in a separate native thread. Ideally, there should be
184 # a single thread permanently running an asyncio loop, but for
185 # convenience we'll use eventlet.tpool, which leverages a thread pool.
186 #
187 # That being considered, we're setting up a temporary asyncio loop to
188 # handle this call.
189 loop = asyncio.new_event_loop()
190 try:
191 asyncio.set_event_loop(loop)
192 return loop.run_until_complete(async_wrapper())
193 finally:
194 loop.close()
196 # We'll use eventlet timeouts as an extra precaution and asyncio timeouts
197 # to avoid lingering threads. For consistency, we'll convert eventlet
198 # timeout exceptions to asyncio timeout errors.
199 with eventlet.timeout.Timeout(
200 seconds=timeout,
201 exception=asyncio.TimeoutError("Timeout: %ss" % timeout)):
202 return tpool.execute(tpool_wrapper)