Coverage for watcher/notifications/service.py: 92%

38 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 Servionica 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16from oslo_config import cfg 

17 

18from watcher.notifications import base as notificationbase 

19from watcher.objects import base 

20from watcher.objects import fields as wfields 

21from watcher.objects import service as o_service 

22 

23CONF = cfg.CONF 

24 

25 

26@base.WatcherObjectRegistry.register_notification 

27class ServicePayload(notificationbase.NotificationPayloadBase): 

28 

29 SCHEMA = { 

30 'sevice_host': ('failed_service', 'host'), 

31 'name': ('failed_service', 'name'), 

32 'last_seen_up': ('failed_service', 'last_seen_up'), 

33 } 

34 # Version 1.0: Initial version 

35 VERSION = '1.0' 

36 fields = { 

37 'sevice_host': wfields.StringField(), 

38 'name': wfields.StringField(), 

39 'last_seen_up': wfields.DateTimeField(nullable=True), 

40 } 

41 

42 def __init__(self, failed_service, status_update, **kwargs): 

43 super(ServicePayload, self).__init__( 

44 failed_service=failed_service, 

45 status_update=status_update, **kwargs) 

46 self.populate_schema(failed_service=failed_service) 

47 

48 

49@base.WatcherObjectRegistry.register_notification 

50class ServiceStatusUpdatePayload(notificationbase.NotificationPayloadBase): 

51 # Version 1.0: Initial version 

52 VERSION = '1.0' 

53 fields = { 

54 'old_state': wfields.StringField(nullable=True), 

55 'state': wfields.StringField(nullable=True), 

56 } 

57 

58 

59@base.WatcherObjectRegistry.register_notification 

60class ServiceUpdatePayload(ServicePayload): 

61 # Version 1.0: Initial version 

62 VERSION = '1.0' 

63 fields = { 

64 'status_update': wfields.ObjectField('ServiceStatusUpdatePayload'), 

65 } 

66 

67 def __init__(self, failed_service, status_update): 

68 super(ServiceUpdatePayload, self).__init__( 

69 failed_service=failed_service, 

70 status_update=status_update) 

71 

72 

73@notificationbase.notification_sample('service-update.json') 

74@base.WatcherObjectRegistry.register_notification 

75class ServiceUpdateNotification(notificationbase.NotificationBase): 

76 # Version 1.0: Initial version 

77 VERSION = '1.0' 

78 

79 fields = { 

80 'payload': wfields.ObjectField('ServiceUpdatePayload') 

81 } 

82 

83 

84def send_service_update(context, failed_service, state, 

85 service='infra-optim', 

86 host=None): 

87 """Emit an service failed notification.""" 

88 if state == o_service.ServiceStatus.FAILED: 88 ↛ 94line 88 didn't jump to line 94 because the condition on line 88 was always true

89 priority = wfields.NotificationPriority.WARNING 

90 status_update = ServiceStatusUpdatePayload( 

91 old_state=o_service.ServiceStatus.ACTIVE, 

92 state=o_service.ServiceStatus.FAILED) 

93 else: 

94 priority = wfields.NotificationPriority.INFO 

95 status_update = ServiceStatusUpdatePayload( 

96 old_state=o_service.ServiceStatus.FAILED, 

97 state=o_service.ServiceStatus.ACTIVE) 

98 versioned_payload = ServiceUpdatePayload( 

99 failed_service=failed_service, 

100 status_update=status_update 

101 ) 

102 

103 notification = ServiceUpdateNotification( 

104 priority=priority, 

105 event_type=notificationbase.EventType( 

106 object='service', 

107 action=wfields.NotificationAction.UPDATE), 

108 publisher=notificationbase.NotificationPublisher( 

109 host=host or CONF.host, 

110 binary=service), 

111 payload=versioned_payload) 

112 

113 notification.emit(context)