Coverage for watcher/notifications/base.py: 95%
77 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15from oslo_config import cfg
16from oslo_log import log
18from watcher.common import exception
19from watcher.common import rpc
20from watcher.objects import base
21from watcher.objects import fields as wfields
23CONF = cfg.CONF
24LOG = log.getLogger(__name__)
26# Definition of notification levels in increasing order of severity
27NOTIFY_LEVELS = {
28 wfields.NotificationPriority.DEBUG: 0,
29 wfields.NotificationPriority.INFO: 1,
30 wfields.NotificationPriority.WARNING: 2,
31 wfields.NotificationPriority.ERROR: 3,
32 wfields.NotificationPriority.CRITICAL: 4
33}
36@base.WatcherObjectRegistry.register_if(False)
37class NotificationObject(base.WatcherObject):
38 """Base class for every notification related versioned object."""
40 # Version 1.0: Initial version
41 VERSION = '1.0'
43 def __init__(self, **kwargs):
44 super(NotificationObject, self).__init__(**kwargs)
45 # The notification objects are created on the fly when watcher emits
46 # the notification. This causes that every object shows every field as
47 # changed. We don't want to send this meaningless information so we
48 # reset the object after creation.
49 self.obj_reset_changes(recursive=False)
51 def save(self, context):
52 raise exception.UnsupportedError()
54 def obj_load_attr(self, attrname):
55 raise exception.UnsupportedError()
58@base.WatcherObjectRegistry.register_notification
59class EventType(NotificationObject):
61 # Version 1.0: Initial version
62 # Version 1.1: Added STRATEGY action in NotificationAction enum
63 # Version 1.2: Added PLANNER action in NotificationAction enum
64 # Version 1.3: Added EXECUTION action in NotificationAction enum
65 VERSION = '1.3'
67 fields = {
68 'object': wfields.StringField(),
69 'action': wfields.NotificationActionField(),
70 'phase': wfields.NotificationPhaseField(nullable=True),
71 }
73 def to_notification_event_type_field(self):
74 """Serialize the object to the wire format."""
75 s = '%s.%s' % (self.object, self.action)
76 if self.obj_attr_is_set('phase'):
77 s += '.%s' % self.phase
78 return s
81@base.WatcherObjectRegistry.register_if(False)
82class NotificationPayloadBase(NotificationObject):
83 """Base class for the payload of versioned notifications."""
84 # SCHEMA defines how to populate the payload fields. It is a dictionary
85 # where every key value pair has the following format:
86 # <payload_field_name>: (<data_source_name>,
87 # <field_of_the_data_source>)
88 # The <payload_field_name> is the name where the data will be stored in the
89 # payload object, this field has to be defined as a field of the payload.
90 # The <data_source_name> shall refer to name of the parameter passed as
91 # kwarg to the payload's populate_schema() call and this object will be
92 # used as the source of the data. The <field_of_the_data_source> shall be
93 # a valid field of the passed argument.
94 # The SCHEMA needs to be applied with the populate_schema() call before the
95 # notification can be emitted.
96 # The value of the payload.<payload_field_name> field will be set by the
97 # <data_source_name>.<field_of_the_data_source> field. The
98 # <data_source_name> will not be part of the payload object internal or
99 # external representation.
100 # Payload fields that are not set by the SCHEMA can be filled in the same
101 # way as in any versioned object.
102 SCHEMA = {}
103 # Version 1.0: Initial version
104 VERSION = '1.0'
106 def __init__(self, **kwargs):
107 super(NotificationPayloadBase, self).__init__(**kwargs)
108 self.populated = not self.SCHEMA
110 def populate_schema(self, **kwargs):
111 """Populate the object based on the SCHEMA and the source objects
113 :param kwargs: A dict contains the source object at the key defined in
114 the SCHEMA
115 """
116 for key, (obj, field) in self.SCHEMA.items():
117 source = kwargs[obj]
118 if source.obj_attr_is_set(field): 118 ↛ 116line 118 didn't jump to line 116 because the condition on line 118 was always true
119 setattr(self, key, getattr(source, field))
120 self.populated = True
122 # the schema population will create changed fields but we don't need
123 # this information in the notification
124 self.obj_reset_changes(recursive=False)
127@base.WatcherObjectRegistry.register_notification
128class NotificationPublisher(NotificationObject):
129 # Version 1.0: Initial version
130 VERSION = '1.0'
132 fields = {
133 'host': wfields.StringField(nullable=False),
134 'binary': wfields.StringField(nullable=False),
135 }
138@base.WatcherObjectRegistry.register_if(False)
139class NotificationBase(NotificationObject):
140 """Base class for versioned notifications.
142 Every subclass shall define a 'payload' field.
143 """
145 # Version 1.0: Initial version
146 VERSION = '1.0'
148 fields = {
149 'priority': wfields.NotificationPriorityField(),
150 'event_type': wfields.ObjectField('EventType'),
151 'publisher': wfields.ObjectField('NotificationPublisher'),
152 }
154 def save(self, context):
155 raise exception.UnsupportedError()
157 def obj_load_attr(self, attrname):
158 raise exception.UnsupportedError()
160 def _should_notify(self):
161 """Determine whether the notification should be sent.
163 A notification is sent when the level of the notification is
164 greater than or equal to the level specified in the
165 configuration, in the increasing order of DEBUG, INFO, WARNING,
166 ERROR, CRITICAL.
167 :return: True if notification should be sent, False otherwise.
168 """
169 if not CONF.notification_level:
170 return False
171 return (NOTIFY_LEVELS[self.priority] >=
172 NOTIFY_LEVELS[CONF.notification_level])
174 def _emit(self, context, event_type, publisher_id, payload):
175 notifier = rpc.get_notifier(publisher_id)
176 notify = getattr(notifier, self.priority)
177 LOG.debug("Emitting notification `%s`", event_type)
178 notify(context, event_type=event_type, payload=payload)
180 def emit(self, context):
181 """Send the notification."""
182 if not self._should_notify():
183 return
184 if not self.payload.populated:
185 raise exception.NotificationPayloadError(
186 class_name=self.__class__.__name__)
187 # Note(gibi): notification payload will be a newly populated object
188 # therefore every field of it will look changed so this does not carry
189 # any extra information so we drop this from the payload.
190 self.payload.obj_reset_changes(recursive=False)
192 self._emit(
193 context,
194 event_type=self.event_type.to_notification_event_type_field(),
195 publisher_id='%s:%s' % (self.publisher.binary,
196 self.publisher.host),
197 payload=self.payload.obj_to_primitive())
200def notification_sample(sample):
201 """Provide a notification sample of the decorated notification.
203 Class decorator to attach the notification sample information
204 to the notification object for documentation generation purposes.
206 :param sample: the path of the sample json file relative to the
207 doc/notification_samples/ directory in the watcher
208 repository root.
209 """
210 def wrap(cls):
211 if not getattr(cls, 'samples', None):
212 cls.samples = [sample]
213 else:
214 cls.samples.append(sample)
215 return cls
216 return wrap