Coverage for watcher/notifications/audit.py: 98%
116 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 b<>com
3#
4# Authors: Vincent FRANCOISE <vincent.francoise@b-com.com>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
19from oslo_config import cfg
21from watcher.common import exception
22from watcher.notifications import base as notificationbase
23from watcher.notifications import exception as exception_notifications
24from watcher.notifications import goal as goal_notifications
25from watcher.notifications import strategy as strategy_notifications
26from watcher.objects import base
27from watcher.objects import fields as wfields
29CONF = cfg.CONF
32@base.WatcherObjectRegistry.register_notification
33class TerseAuditPayload(notificationbase.NotificationPayloadBase):
34 SCHEMA = {
35 'uuid': ('audit', 'uuid'),
36 'name': ('audit', 'name'),
37 'audit_type': ('audit', 'audit_type'),
38 'state': ('audit', 'state'),
39 'parameters': ('audit', 'parameters'),
40 'interval': ('audit', 'interval'),
41 'scope': ('audit', 'scope'),
42 'auto_trigger': ('audit', 'auto_trigger'),
43 'next_run_time': ('audit', 'next_run_time'),
45 'created_at': ('audit', 'created_at'),
46 'updated_at': ('audit', 'updated_at'),
47 'deleted_at': ('audit', 'deleted_at'),
48 }
50 # Version 1.0: Initial version
51 # Version 1.1: Added 'auto_trigger' boolean field,
52 # Added 'next_run_time' DateTime field,
53 # 'interval' type has been changed from Integer to String
54 # Version 1.2: Added 'name' string field
55 VERSION = '1.2'
57 fields = {
58 'uuid': wfields.UUIDField(),
59 'name': wfields.StringField(),
60 'audit_type': wfields.StringField(),
61 'state': wfields.StringField(),
62 'parameters': wfields.FlexibleDictField(nullable=True),
63 'interval': wfields.StringField(nullable=True),
64 'scope': wfields.FlexibleListOfDictField(nullable=True),
65 'goal_uuid': wfields.UUIDField(),
66 'strategy_uuid': wfields.UUIDField(nullable=True),
67 'auto_trigger': wfields.BooleanField(),
68 'next_run_time': wfields.DateTimeField(nullable=True),
70 'created_at': wfields.DateTimeField(nullable=True),
71 'updated_at': wfields.DateTimeField(nullable=True),
72 'deleted_at': wfields.DateTimeField(nullable=True),
73 }
75 def __init__(self, audit, goal_uuid, strategy_uuid=None, **kwargs):
76 super(TerseAuditPayload, self).__init__(
77 goal_uuid=goal_uuid, strategy_uuid=strategy_uuid, **kwargs)
78 self.populate_schema(audit=audit)
81@base.WatcherObjectRegistry.register_notification
82class AuditPayload(TerseAuditPayload):
83 SCHEMA = {
84 'uuid': ('audit', 'uuid'),
85 'name': ('audit', 'name'),
86 'audit_type': ('audit', 'audit_type'),
87 'state': ('audit', 'state'),
88 'parameters': ('audit', 'parameters'),
89 'interval': ('audit', 'interval'),
90 'scope': ('audit', 'scope'),
91 'auto_trigger': ('audit', 'auto_trigger'),
92 'next_run_time': ('audit', 'next_run_time'),
94 'created_at': ('audit', 'created_at'),
95 'updated_at': ('audit', 'updated_at'),
96 'deleted_at': ('audit', 'deleted_at'),
97 }
99 # Version 1.0: Initial version
100 # Version 1.1: Added 'auto_trigger' field,
101 # Added 'next_run_time' field
102 # Version 1.2: Added 'name' string field
103 VERSION = '1.2'
105 fields = {
106 'goal': wfields.ObjectField('GoalPayload'),
107 'strategy': wfields.ObjectField('StrategyPayload', nullable=True),
108 }
110 def __init__(self, audit, goal, strategy=None, **kwargs):
111 if not kwargs.get('goal_uuid'): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 kwargs['goal_uuid'] = goal.uuid
114 if strategy and not kwargs.get('strategy_uuid'):
115 kwargs['strategy_uuid'] = strategy.uuid
117 super(AuditPayload, self).__init__(
118 audit=audit, goal=goal,
119 strategy=strategy, **kwargs)
122@base.WatcherObjectRegistry.register_notification
123class AuditStateUpdatePayload(notificationbase.NotificationPayloadBase):
124 # Version 1.0: Initial version
125 VERSION = '1.0'
127 fields = {
128 'old_state': wfields.StringField(nullable=True),
129 'state': wfields.StringField(nullable=True),
130 }
133@base.WatcherObjectRegistry.register_notification
134class AuditCreatePayload(AuditPayload):
135 # Version 1.0: Initial version
136 # Version 1.1: Added 'auto_trigger' field,
137 # Added 'next_run_time' field
138 VERSION = '1.1'
139 fields = {}
141 def __init__(self, audit, goal, strategy):
142 super(AuditCreatePayload, self).__init__(
143 audit=audit,
144 goal=goal,
145 goal_uuid=goal.uuid,
146 strategy=strategy)
149@base.WatcherObjectRegistry.register_notification
150class AuditUpdatePayload(AuditPayload):
151 # Version 1.0: Initial version
152 # Version 1.1: Added 'auto_trigger' field,
153 # Added 'next_run_time' field
154 VERSION = '1.1'
155 fields = {
156 'state_update': wfields.ObjectField('AuditStateUpdatePayload'),
157 }
159 def __init__(self, audit, state_update, goal, strategy):
160 super(AuditUpdatePayload, self).__init__(
161 audit=audit,
162 state_update=state_update,
163 goal=goal,
164 goal_uuid=goal.uuid,
165 strategy=strategy)
168@base.WatcherObjectRegistry.register_notification
169class AuditActionPayload(AuditPayload):
170 # Version 1.0: Initial version
171 # Version 1.1: Added 'auto_trigger' field,
172 # Added 'next_run_time' field
173 VERSION = '1.1'
174 fields = {
175 'fault': wfields.ObjectField('ExceptionPayload', nullable=True),
176 }
178 def __init__(self, audit, goal, strategy, **kwargs):
179 super(AuditActionPayload, self).__init__(
180 audit=audit,
181 goal=goal,
182 goal_uuid=goal.uuid,
183 strategy=strategy,
184 **kwargs)
187@base.WatcherObjectRegistry.register_notification
188class AuditDeletePayload(AuditPayload):
189 # Version 1.0: Initial version
190 # Version 1.1: Added 'auto_trigger' field,
191 # Added 'next_run_time' field
192 VERSION = '1.1'
193 fields = {}
195 def __init__(self, audit, goal, strategy):
196 super(AuditDeletePayload, self).__init__(
197 audit=audit,
198 goal=goal,
199 goal_uuid=goal.uuid,
200 strategy=strategy)
203@notificationbase.notification_sample('audit-strategy-error.json')
204@notificationbase.notification_sample('audit-strategy-end.json')
205@notificationbase.notification_sample('audit-strategy-start.json')
206@base.WatcherObjectRegistry.register_notification
207class AuditActionNotification(notificationbase.NotificationBase):
208 # Version 1.0: Initial version
209 VERSION = '1.0'
211 fields = {
212 'payload': wfields.ObjectField('AuditActionPayload')
213 }
216@notificationbase.notification_sample('audit-create.json')
217@base.WatcherObjectRegistry.register_notification
218class AuditCreateNotification(notificationbase.NotificationBase):
219 # Version 1.0: Initial version
220 VERSION = '1.0'
222 fields = {
223 'payload': wfields.ObjectField('AuditCreatePayload')
224 }
227@notificationbase.notification_sample('audit-update.json')
228@base.WatcherObjectRegistry.register_notification
229class AuditUpdateNotification(notificationbase.NotificationBase):
230 # Version 1.0: Initial version
231 VERSION = '1.0'
233 fields = {
234 'payload': wfields.ObjectField('AuditUpdatePayload')
235 }
238@notificationbase.notification_sample('audit-delete.json')
239@base.WatcherObjectRegistry.register_notification
240class AuditDeleteNotification(notificationbase.NotificationBase):
241 # Version 1.0: Initial version
242 VERSION = '1.0'
244 fields = {
245 'payload': wfields.ObjectField('AuditDeletePayload')
246 }
249def _get_common_payload(audit):
250 goal = None
251 strategy = None
252 try:
253 goal = audit.goal
254 if audit.strategy_id:
255 strategy = audit.strategy
256 except NotImplementedError:
257 raise exception.EagerlyLoadedAuditRequired(audit=audit.uuid)
259 goal_payload = goal_notifications.GoalPayload(goal=goal)
261 strategy_payload = None
262 if strategy:
263 strategy_payload = strategy_notifications.StrategyPayload(
264 strategy=strategy)
266 return goal_payload, strategy_payload
269def send_create(context, audit, service='infra-optim', host=None):
270 """Emit an audit.create notification."""
271 goal_payload, strategy_payload = _get_common_payload(audit)
273 versioned_payload = AuditCreatePayload(
274 audit=audit,
275 goal=goal_payload,
276 strategy=strategy_payload,
277 )
279 notification = AuditCreateNotification(
280 priority=wfields.NotificationPriority.INFO,
281 event_type=notificationbase.EventType(
282 object='audit',
283 action=wfields.NotificationAction.CREATE),
284 publisher=notificationbase.NotificationPublisher(
285 host=host or CONF.host,
286 binary=service),
287 payload=versioned_payload)
289 notification.emit(context)
292def send_update(context, audit, service='infra-optim',
293 host=None, old_state=None):
294 """Emit an audit.update notification."""
295 goal_payload, strategy_payload = _get_common_payload(audit)
297 state_update = AuditStateUpdatePayload(
298 old_state=old_state,
299 state=audit.state if old_state else None)
301 versioned_payload = AuditUpdatePayload(
302 audit=audit,
303 state_update=state_update,
304 goal=goal_payload,
305 strategy=strategy_payload,
306 )
308 notification = AuditUpdateNotification(
309 priority=wfields.NotificationPriority.INFO,
310 event_type=notificationbase.EventType(
311 object='audit',
312 action=wfields.NotificationAction.UPDATE),
313 publisher=notificationbase.NotificationPublisher(
314 host=host or CONF.host,
315 binary=service),
316 payload=versioned_payload)
318 notification.emit(context)
321def send_delete(context, audit, service='infra-optim', host=None):
322 goal_payload, strategy_payload = _get_common_payload(audit)
324 versioned_payload = AuditDeletePayload(
325 audit=audit,
326 goal=goal_payload,
327 strategy=strategy_payload,
328 )
330 notification = AuditDeleteNotification(
331 priority=wfields.NotificationPriority.INFO,
332 event_type=notificationbase.EventType(
333 object='audit',
334 action=wfields.NotificationAction.DELETE),
335 publisher=notificationbase.NotificationPublisher(
336 host=host or CONF.host,
337 binary=service),
338 payload=versioned_payload)
340 notification.emit(context)
343def send_action_notification(context, audit, action, phase=None,
344 priority=wfields.NotificationPriority.INFO,
345 service='infra-optim', host=None):
346 """Emit an audit action notification."""
347 goal_payload, strategy_payload = _get_common_payload(audit)
349 fault = None
350 if phase == wfields.NotificationPhase.ERROR:
351 fault = exception_notifications.ExceptionPayload.from_exception()
353 versioned_payload = AuditActionPayload(
354 audit=audit,
355 goal=goal_payload,
356 strategy=strategy_payload,
357 fault=fault,
358 )
360 notification = AuditActionNotification(
361 priority=priority,
362 event_type=notificationbase.EventType(
363 object='audit',
364 action=action,
365 phase=phase),
366 publisher=notificationbase.NotificationPublisher(
367 host=host or CONF.host,
368 binary=service),
369 payload=versioned_payload)
371 notification.emit(context)