Coverage for watcher/decision_engine/scheduling.py: 97%

60 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 b<>com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17import datetime 

18 

19import eventlet 

20from oslo_log import log 

21 

22from watcher.common import context 

23from watcher.common import exception 

24from watcher.common import scheduling 

25 

26from watcher.decision_engine.model.collector import manager 

27from watcher import objects 

28 

29from watcher import conf 

30 

31LOG = log.getLogger(__name__) 

32CONF = conf.CONF 

33 

34 

35class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService): 

36 

37 def __init__(self, gconfig=None, **options): 

38 gconfig = None or {} 

39 super(DecisionEngineSchedulingService, self).__init__( 

40 gconfig, **options) 

41 self.collector_manager = manager.CollectorManager() 

42 

43 @property 

44 def collectors(self): 

45 return self.collector_manager.get_collectors() 

46 

47 def add_sync_jobs(self): 

48 for name, collector in self.collectors.items(): 

49 timed_task = self._wrap_collector_sync_with_timeout( 

50 collector, name) 

51 self.add_job(timed_task, 

52 trigger='interval', 

53 seconds=collector.config.period, 

54 next_run_time=datetime.datetime.now()) 

55 

56 def _as_timed_sync_func(self, sync_func, name, timeout): 

57 def _timed_sync(): 

58 with eventlet.Timeout( 

59 timeout, 

60 exception=exception.ClusterDataModelCollectionError(cdm=name) 

61 ): 

62 sync_func() 

63 

64 return _timed_sync 

65 

66 def _wrap_collector_sync_with_timeout(self, collector, name): 

67 """Add an execution timeout constraint on a function""" 

68 timeout = collector.config.period 

69 

70 def _sync(): 

71 try: 

72 timed_sync = self._as_timed_sync_func( 

73 collector.synchronize, name, timeout) 

74 timed_sync() 

75 except Exception as exc: 

76 LOG.exception(exc) 

77 collector.set_cluster_data_model_as_stale() 

78 

79 return _sync 

80 

81 def add_checkstate_job(self): 

82 # 30 minutes interval 

83 interval = CONF.watcher_decision_engine.check_periodic_interval 

84 ap_manager = objects.action_plan.StateManager() 

85 if CONF.watcher_decision_engine.action_plan_expiry != 0: 85 ↛ exitline 85 didn't return from function 'add_checkstate_job' because the condition on line 85 was always true

86 self.add_job(ap_manager.check_expired, 'interval', 

87 args=[context.make_context()], 

88 seconds=interval, 

89 next_run_time=datetime.datetime.now()) 

90 

91 def cancel_ongoing_audits(self): 

92 audit_filters = { 

93 'audit_type': objects.audit.AuditType.ONESHOT.value, 

94 'state': objects.audit.State.ONGOING, 

95 'hostname': CONF.host 

96 } 

97 local_context = context.make_context() 

98 ongoing_audits = objects.Audit.list( 

99 local_context, 

100 filters=audit_filters) 

101 for audit in ongoing_audits: 

102 audit.state = objects.audit.State.CANCELLED 

103 audit.save() 

104 LOG.info("Audit %(uuid)s has been cancelled because it was in " 

105 "%(state)s state when Decision Engine had been stopped " 

106 "on %(hostname)s host.", 

107 {'uuid': audit.uuid, 

108 'state': objects.audit.State.ONGOING, 

109 'hostname': audit.hostname}) 

110 

111 def start(self): 

112 """Start service.""" 

113 self.add_sync_jobs() 

114 self.add_checkstate_job() 

115 self.cancel_ongoing_audits() 

116 super(DecisionEngineSchedulingService, self).start() 

117 

118 def stop(self): 

119 """Stop service.""" 

120 self.shutdown() 

121 

122 def wait(self): 

123 """Wait for service to complete.""" 

124 

125 def reset(self): 

126 """Reset service. 

127 

128 Called in case service running in daemon mode receives SIGHUP. 

129 """