Coverage for watcher/api/middleware/auth_token.py: 58%

22 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import re 

17 

18from oslo_log import log 

19 

20from keystonemiddleware import auth_token 

21 

22from watcher._i18n import _ 

23from watcher.common import exception 

24from watcher.common import utils 

25 

26LOG = log.getLogger(__name__) 

27 

28 

29class AuthTokenMiddleware(auth_token.AuthProtocol): 

30 """A wrapper on Keystone auth_token middleware. 

31 

32 Does not perform verification of authentication tokens 

33 for public routes in the API. 

34 

35 """ 

36 

37 def __init__(self, app, conf, public_api_routes=()): 

38 route_pattern_tpl = r'%s(\.json|\.xml)?$' 

39 

40 try: 

41 self.public_api_routes = [re.compile(route_pattern_tpl % route_tpl) 

42 for route_tpl in public_api_routes] 

43 except re.error as e: 

44 LOG.exception(e) 

45 raise exception.ConfigInvalid( 

46 error_msg=_('Cannot compile public API routes')) 

47 

48 super(AuthTokenMiddleware, self).__init__(app, conf) 

49 

50 def __call__(self, env, start_response): 

51 path = utils.safe_rstrip(env.get('PATH_INFO'), '/') 

52 

53 # The information whether the API call is being performed against the 

54 # public API is required for some other components. Saving it to the 

55 # WSGI environment is reasonable thereby. 

56 env['is_public_api'] = any(re.match(pattern, path) 

57 for pattern in self.public_api_routes) 

58 

59 if env['is_public_api']: 

60 return self._app(env, start_response) 

61 

62 return super(AuthTokenMiddleware, self).__call__(env, start_response)