Coverage for watcher/common/policy.py: 62%

32 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# Copyright (c) 2011 OpenStack Foundation 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Policy Engine For Watcher.""" 

17 

18import sys 

19 

20from oslo_config import cfg 

21from oslo_policy import policy 

22 

23from watcher.common import exception 

24from watcher.common import policies 

25 

26_ENFORCER = None 

27CONF = cfg.CONF 

28 

29 

30# we can get a policy enforcer by this init. 

31# oslo policy support change policy rule dynamically. 

32# at present, policy.enforce will reload the policy rules when it checks 

33# the policy files have been touched. 

34def init(policy_file=None, rules=None, 

35 default_rule=None, use_conf=True, overwrite=True): 

36 """Init an Enforcer class. 

37 

38 :param policy_file: Custom policy file to use, if none is 

39 specified, ``conf.policy_file`` will be 

40 used. 

41 :param rules: Default dictionary / Rules to use. It will be 

42 considered just in the first instantiation. If 

43 :meth:`load_rules` with ``force_reload=True``, 

44 :meth:`clear` or :meth:`set_rules` with 

45 ``overwrite=True`` is called this will be overwritten. 

46 :param default_rule: Default rule to use, conf.default_rule will 

47 be used if none is specified. 

48 :param use_conf: Whether to load rules from cache or config file. 

49 :param overwrite: Whether to overwrite existing rules when reload rules 

50 from config file. 

51 """ 

52 global _ENFORCER 

53 if not _ENFORCER: 

54 # https://docs.openstack.org/oslo.policy/latest/admin/index.html 

55 _ENFORCER = policy.Enforcer(CONF, 

56 policy_file=policy_file, 

57 rules=rules, 

58 default_rule=default_rule, 

59 use_conf=use_conf, 

60 overwrite=overwrite) 

61 _ENFORCER.register_defaults(policies.list_rules()) 

62 return _ENFORCER 

63 

64 

65def enforce(context, rule=None, target=None, 

66 do_raise=True, exc=None, *args, **kwargs): 

67 """Checks authorization of a rule against the target and credentials. 

68 

69 :param dict context: As much information about the user performing the 

70 action as possible. 

71 :param rule: The rule to evaluate. 

72 :param dict target: As much information about the object being operated 

73 on as possible. 

74 :param do_raise: Whether to raise an exception or not if check 

75 fails. 

76 :param exc: Class of the exception to raise if the check fails. 

77 Any remaining arguments passed to :meth:`enforce` (both 

78 positional and keyword arguments) will be passed to 

79 the exception class. If not specified, 

80 :class:`PolicyNotAuthorized` will be used. 

81 

82 :return: ``False`` if the policy does not allow the action and `exc` is 

83 not provided; otherwise, returns a value that evaluates to 

84 ``True``. Note: for rules using the "case" expression, this 

85 ``True`` value will be the specified string from the 

86 expression. 

87 """ 

88 enforcer = init() 

89 credentials = context.to_dict() 

90 if not exc: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was always true

91 exc = exception.PolicyNotAuthorized 

92 if target is None: 

93 target = {'project_id': context.project_id, 

94 'user_id': context.user_id} 

95 return enforcer.enforce(rule, target, credentials, 

96 do_raise=do_raise, exc=exc, *args, **kwargs) 

97 

98 

99def get_enforcer(): 

100 # This method is for use by oslopolicy CLI scripts. Those scripts need the 

101 # 'output-file' and 'namespace' options, but having those in sys.argv means 

102 # loading the Watcher config options will fail as those are not expected 

103 # to be present. So we pass in an arg list with those stripped out. 

104 conf_args = [] 

105 # Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:] 

106 i = 1 

107 while i < len(sys.argv): 

108 if sys.argv[i].strip('-') in ['namespace', 'output-file']: 

109 i += 2 

110 continue 

111 conf_args.append(sys.argv[i]) 

112 i += 1 

113 

114 cfg.CONF(conf_args, project='watcher') 

115 init() 

116 return _ENFORCER