Coverage for watcher/notifications/base.py: 95%

77 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15from oslo_config import cfg 

16from oslo_log import log 

17 

18from watcher.common import exception 

19from watcher.common import rpc 

20from watcher.objects import base 

21from watcher.objects import fields as wfields 

22 

23CONF = cfg.CONF 

24LOG = log.getLogger(__name__) 

25 

26# Definition of notification levels in increasing order of severity 

27NOTIFY_LEVELS = { 

28 wfields.NotificationPriority.DEBUG: 0, 

29 wfields.NotificationPriority.INFO: 1, 

30 wfields.NotificationPriority.WARNING: 2, 

31 wfields.NotificationPriority.ERROR: 3, 

32 wfields.NotificationPriority.CRITICAL: 4 

33} 

34 

35 

36@base.WatcherObjectRegistry.register_if(False) 

37class NotificationObject(base.WatcherObject): 

38 """Base class for every notification related versioned object.""" 

39 

40 # Version 1.0: Initial version 

41 VERSION = '1.0' 

42 

43 def __init__(self, **kwargs): 

44 super(NotificationObject, self).__init__(**kwargs) 

45 # The notification objects are created on the fly when watcher emits 

46 # the notification. This causes that every object shows every field as 

47 # changed. We don't want to send this meaningless information so we 

48 # reset the object after creation. 

49 self.obj_reset_changes(recursive=False) 

50 

51 def save(self, context): 

52 raise exception.UnsupportedError() 

53 

54 def obj_load_attr(self, attrname): 

55 raise exception.UnsupportedError() 

56 

57 

58@base.WatcherObjectRegistry.register_notification 

59class EventType(NotificationObject): 

60 

61 # Version 1.0: Initial version 

62 # Version 1.1: Added STRATEGY action in NotificationAction enum 

63 # Version 1.2: Added PLANNER action in NotificationAction enum 

64 # Version 1.3: Added EXECUTION action in NotificationAction enum 

65 VERSION = '1.3' 

66 

67 fields = { 

68 'object': wfields.StringField(), 

69 'action': wfields.NotificationActionField(), 

70 'phase': wfields.NotificationPhaseField(nullable=True), 

71 } 

72 

73 def to_notification_event_type_field(self): 

74 """Serialize the object to the wire format.""" 

75 s = '%s.%s' % (self.object, self.action) 

76 if self.obj_attr_is_set('phase'): 

77 s += '.%s' % self.phase 

78 return s 

79 

80 

81@base.WatcherObjectRegistry.register_if(False) 

82class NotificationPayloadBase(NotificationObject): 

83 """Base class for the payload of versioned notifications.""" 

84 # SCHEMA defines how to populate the payload fields. It is a dictionary 

85 # where every key value pair has the following format: 

86 # <payload_field_name>: (<data_source_name>, 

87 # <field_of_the_data_source>) 

88 # The <payload_field_name> is the name where the data will be stored in the 

89 # payload object, this field has to be defined as a field of the payload. 

90 # The <data_source_name> shall refer to name of the parameter passed as 

91 # kwarg to the payload's populate_schema() call and this object will be 

92 # used as the source of the data. The <field_of_the_data_source> shall be 

93 # a valid field of the passed argument. 

94 # The SCHEMA needs to be applied with the populate_schema() call before the 

95 # notification can be emitted. 

96 # The value of the payload.<payload_field_name> field will be set by the 

97 # <data_source_name>.<field_of_the_data_source> field. The 

98 # <data_source_name> will not be part of the payload object internal or 

99 # external representation. 

100 # Payload fields that are not set by the SCHEMA can be filled in the same 

101 # way as in any versioned object. 

102 SCHEMA = {} 

103 # Version 1.0: Initial version 

104 VERSION = '1.0' 

105 

106 def __init__(self, **kwargs): 

107 super(NotificationPayloadBase, self).__init__(**kwargs) 

108 self.populated = not self.SCHEMA 

109 

110 def populate_schema(self, **kwargs): 

111 """Populate the object based on the SCHEMA and the source objects 

112 

113 :param kwargs: A dict contains the source object at the key defined in 

114 the SCHEMA 

115 """ 

116 for key, (obj, field) in self.SCHEMA.items(): 

117 source = kwargs[obj] 

118 if source.obj_attr_is_set(field): 118 ↛ 116line 118 didn't jump to line 116 because the condition on line 118 was always true

119 setattr(self, key, getattr(source, field)) 

120 self.populated = True 

121 

122 # the schema population will create changed fields but we don't need 

123 # this information in the notification 

124 self.obj_reset_changes(recursive=False) 

125 

126 

127@base.WatcherObjectRegistry.register_notification 

128class NotificationPublisher(NotificationObject): 

129 # Version 1.0: Initial version 

130 VERSION = '1.0' 

131 

132 fields = { 

133 'host': wfields.StringField(nullable=False), 

134 'binary': wfields.StringField(nullable=False), 

135 } 

136 

137 

138@base.WatcherObjectRegistry.register_if(False) 

139class NotificationBase(NotificationObject): 

140 """Base class for versioned notifications. 

141 

142 Every subclass shall define a 'payload' field. 

143 """ 

144 

145 # Version 1.0: Initial version 

146 VERSION = '1.0' 

147 

148 fields = { 

149 'priority': wfields.NotificationPriorityField(), 

150 'event_type': wfields.ObjectField('EventType'), 

151 'publisher': wfields.ObjectField('NotificationPublisher'), 

152 } 

153 

154 def save(self, context): 

155 raise exception.UnsupportedError() 

156 

157 def obj_load_attr(self, attrname): 

158 raise exception.UnsupportedError() 

159 

160 def _should_notify(self): 

161 """Determine whether the notification should be sent. 

162 

163 A notification is sent when the level of the notification is 

164 greater than or equal to the level specified in the 

165 configuration, in the increasing order of DEBUG, INFO, WARNING, 

166 ERROR, CRITICAL. 

167 :return: True if notification should be sent, False otherwise. 

168 """ 

169 if not CONF.notification_level: 

170 return False 

171 return (NOTIFY_LEVELS[self.priority] >= 

172 NOTIFY_LEVELS[CONF.notification_level]) 

173 

174 def _emit(self, context, event_type, publisher_id, payload): 

175 notifier = rpc.get_notifier(publisher_id) 

176 notify = getattr(notifier, self.priority) 

177 LOG.debug("Emitting notification `%s`", event_type) 

178 notify(context, event_type=event_type, payload=payload) 

179 

180 def emit(self, context): 

181 """Send the notification.""" 

182 if not self._should_notify(): 

183 return 

184 if not self.payload.populated: 

185 raise exception.NotificationPayloadError( 

186 class_name=self.__class__.__name__) 

187 # Note(gibi): notification payload will be a newly populated object 

188 # therefore every field of it will look changed so this does not carry 

189 # any extra information so we drop this from the payload. 

190 self.payload.obj_reset_changes(recursive=False) 

191 

192 self._emit( 

193 context, 

194 event_type=self.event_type.to_notification_event_type_field(), 

195 publisher_id='%s:%s' % (self.publisher.binary, 

196 self.publisher.host), 

197 payload=self.payload.obj_to_primitive()) 

198 

199 

200def notification_sample(sample): 

201 """Provide a notification sample of the decorated notification. 

202 

203 Class decorator to attach the notification sample information 

204 to the notification object for documentation generation purposes. 

205 

206 :param sample: the path of the sample json file relative to the 

207 doc/notification_samples/ directory in the watcher 

208 repository root. 

209 """ 

210 def wrap(cls): 

211 if not getattr(cls, 'samples', None): 

212 cls.samples = [sample] 

213 else: 

214 cls.samples.append(sample) 

215 return cls 

216 return wrap