Coverage for watcher/decision_engine/model/notification/filtering.py: 84%

29 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 b<>com 

3# 

4# Authors: Vincent FRANCOISE <Vincent.FRANCOISE@b-com.com> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18 

19import re 

20 

21import oslo_messaging as om 

22 

23 

24class NotificationFilter(om.NotificationFilter): 

25 """Notification Endpoint base class 

26 

27 This class is responsible for handling incoming notifications. Depending 

28 on the priority level of the incoming, you may need to implement one or 

29 more of the following methods: 

30 

31 .. code: py 

32 def audit(self, ctxt, publisher_id, event_type, payload, metadata): 

33 do_something(payload) 

34 

35 def info(self, ctxt, publisher_id, event_type, payload, metadata): 

36 do_something(payload) 

37 

38 def warn(self, ctxt, publisher_id, event_type, payload, metadata): 

39 do_something(payload) 

40 

41 def error(self, ctxt, publisher_id, event_type, payload, metadata): 

42 do_something(payload) 

43 

44 def critical(self, ctxt, publisher_id, event_type, payload, metadata): 

45 do_something(payload) 

46 """ 

47 

48 def _build_regex_dict(self, regex_list): 

49 if regex_list is None: 

50 return {} 

51 

52 regex_mapping = {} 

53 for key, value in regex_list.items(): 

54 if isinstance(value, dict): 

55 regex_mapping[key] = self._build_regex_dict(value) 

56 else: 

57 if callable(value): 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true

58 regex_mapping[key] = value 

59 elif value is not None: 59 ↛ 62line 59 didn't jump to line 62 because the condition on line 59 was always true

60 regex_mapping[key] = re.compile(value) 

61 else: 

62 regex_mapping[key] = None 

63 

64 return regex_mapping 

65 

66 def _check_for_mismatch(self, data, regex): 

67 if isinstance(regex, dict): 

68 mismatch_results = [ 

69 k not in data or not self._check_for_mismatch(data[k], v) 

70 for k, v in regex.items() 

71 ] 

72 if not mismatch_results: 

73 return False 

74 

75 return all(mismatch_results) 

76 elif callable(regex): 76 ↛ 79line 76 didn't jump to line 79 because the condition on line 76 was never true

77 # The filter is a callable that should return True 

78 # if there is a mismatch 

79 return regex(data) 

80 elif regex is not None and data is None: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 return True 

82 elif (regex is not None and 

83 isinstance(data, str) and 

84 not regex.match(data)): 

85 return True 

86 

87 return False