Coverage for watcher/api/scheduling.py: 80%

56 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 Servionica 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17 

18import datetime 

19import itertools 

20from oslo_config import cfg 

21from oslo_log import log 

22from oslo_utils import timeutils 

23 

24from watcher.common import context as watcher_context 

25from watcher.common import scheduling 

26from watcher import notifications 

27 

28from watcher import objects 

29 

30CONF = cfg.CONF 

31LOG = log.getLogger(__name__) 

32 

33 

34class APISchedulingService(scheduling.BackgroundSchedulerService): 

35 

36 def __init__(self, gconfig={}, **options): 

37 self.services_status = {} 

38 super(APISchedulingService, self).__init__(gconfig, **options) 

39 

40 def get_services_status(self, context): 

41 services = objects.service.Service.list(context) 

42 active_s = objects.service.ServiceStatus.ACTIVE 

43 failed_s = objects.service.ServiceStatus.FAILED 

44 for service in services: 

45 result = self.get_service_status(context, service.id) 

46 if service.id not in self.services_status: 

47 self.services_status[service.id] = result 

48 continue 

49 if self.services_status[service.id] != result: 

50 self.services_status[service.id] = result 

51 notifications.service.send_service_update(context, service, 

52 state=result) 

53 if (result == failed_s) and ( 53 ↛ 55line 53 didn't jump to line 55 because the condition on line 53 was never true

54 service.name == 'watcher-decision-engine'): 

55 audit_filters = { 

56 'audit_type': objects.audit.AuditType.CONTINUOUS.value, 

57 'state': objects.audit.State.ONGOING, 

58 'hostname': service.host 

59 } 

60 ongoing_audits = objects.Audit.list( 

61 context, 

62 filters=audit_filters, 

63 eager=True) 

64 alive_services = [ 

65 s.host for s in services 

66 if (self.services_status[s.id] == active_s and 

67 s.name == 'watcher-decision-engine')] 

68 

69 round_robin = itertools.cycle(alive_services) 

70 for audit in ongoing_audits: 

71 audit.hostname = round_robin.__next__() 

72 audit.save() 

73 LOG.info('Audit %(audit)s has been migrated to ' 

74 '%(host)s since %(failed_host)s is in' 

75 ' %(state)s', 

76 {'audit': audit.uuid, 

77 'host': audit.hostname, 

78 'failed_host': service.host, 

79 'state': failed_s}) 

80 

81 def get_service_status(self, context, service_id): 

82 service = objects.Service.get(context, service_id) 

83 last_heartbeat = (service.last_seen_up or service.updated_at or 

84 service.created_at) 

85 if isinstance(last_heartbeat, str): 85 ↛ 89line 85 didn't jump to line 89 because the condition on line 85 was never true

86 # NOTE(russellb) If this service came in over rpc via 

87 # conductor, then the timestamp will be a string and needs to be 

88 # converted back to a datetime. 

89 last_heartbeat = timeutils.parse_strtime(last_heartbeat) 

90 else: 

91 # Objects have proper UTC timezones, but the timeutils comparison 

92 # below does not (and will fail) 

93 last_heartbeat = last_heartbeat.replace(tzinfo=None) 

94 elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow()) 

95 is_up = abs(elapsed) <= CONF.service_down_time 

96 if not is_up: 

97 LOG.warning('Seems service %(name)s on host %(host)s is down. ' 

98 'Last heartbeat was %(lhb)s. Elapsed time is %(el)s', 

99 {'name': service.name, 

100 'host': service.host, 

101 'lhb': str(last_heartbeat), 'el': str(elapsed)}) 

102 return objects.service.ServiceStatus.FAILED 

103 

104 return objects.service.ServiceStatus.ACTIVE 

105 

106 def start(self): 

107 """Start service.""" 

108 context = watcher_context.make_context(is_admin=True) 

109 self.add_job(self.get_services_status, name='service_status', 

110 trigger='interval', jobstore='default', args=[context], 

111 next_run_time=datetime.datetime.now(), 

112 seconds=CONF.periodic_interval) 

113 super(APISchedulingService, self).start() 

114 

115 def stop(self): 

116 """Stop service.""" 

117 self.shutdown() 

118 

119 def wait(self): 

120 """Wait for service to complete.""" 

121 

122 def reset(self): 

123 """Reset service. 

124 

125 Called in case service running in daemon mode receives SIGHUP. 

126 """