Coverage for watcher/decision_engine/scheduling.py: 97%
60 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 b<>com
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17import datetime
19import eventlet
20from oslo_log import log
22from watcher.common import context
23from watcher.common import exception
24from watcher.common import scheduling
26from watcher.decision_engine.model.collector import manager
27from watcher import objects
29from watcher import conf
31LOG = log.getLogger(__name__)
32CONF = conf.CONF
35class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService):
37 def __init__(self, gconfig=None, **options):
38 gconfig = None or {}
39 super(DecisionEngineSchedulingService, self).__init__(
40 gconfig, **options)
41 self.collector_manager = manager.CollectorManager()
43 @property
44 def collectors(self):
45 return self.collector_manager.get_collectors()
47 def add_sync_jobs(self):
48 for name, collector in self.collectors.items():
49 timed_task = self._wrap_collector_sync_with_timeout(
50 collector, name)
51 self.add_job(timed_task,
52 trigger='interval',
53 seconds=collector.config.period,
54 next_run_time=datetime.datetime.now())
56 def _as_timed_sync_func(self, sync_func, name, timeout):
57 def _timed_sync():
58 with eventlet.Timeout(
59 timeout,
60 exception=exception.ClusterDataModelCollectionError(cdm=name)
61 ):
62 sync_func()
64 return _timed_sync
66 def _wrap_collector_sync_with_timeout(self, collector, name):
67 """Add an execution timeout constraint on a function"""
68 timeout = collector.config.period
70 def _sync():
71 try:
72 timed_sync = self._as_timed_sync_func(
73 collector.synchronize, name, timeout)
74 timed_sync()
75 except Exception as exc:
76 LOG.exception(exc)
77 collector.set_cluster_data_model_as_stale()
79 return _sync
81 def add_checkstate_job(self):
82 # 30 minutes interval
83 interval = CONF.watcher_decision_engine.check_periodic_interval
84 ap_manager = objects.action_plan.StateManager()
85 if CONF.watcher_decision_engine.action_plan_expiry != 0: 85 ↛ exitline 85 didn't return from function 'add_checkstate_job' because the condition on line 85 was always true
86 self.add_job(ap_manager.check_expired, 'interval',
87 args=[context.make_context()],
88 seconds=interval,
89 next_run_time=datetime.datetime.now())
91 def cancel_ongoing_audits(self):
92 audit_filters = {
93 'audit_type': objects.audit.AuditType.ONESHOT.value,
94 'state': objects.audit.State.ONGOING,
95 'hostname': CONF.host
96 }
97 local_context = context.make_context()
98 ongoing_audits = objects.Audit.list(
99 local_context,
100 filters=audit_filters)
101 for audit in ongoing_audits:
102 audit.state = objects.audit.State.CANCELLED
103 audit.save()
104 LOG.info("Audit %(uuid)s has been cancelled because it was in "
105 "%(state)s state when Decision Engine had been stopped "
106 "on %(hostname)s host.",
107 {'uuid': audit.uuid,
108 'state': objects.audit.State.ONGOING,
109 'hostname': audit.hostname})
111 def start(self):
112 """Start service."""
113 self.add_sync_jobs()
114 self.add_checkstate_job()
115 self.cancel_ongoing_audits()
116 super(DecisionEngineSchedulingService, self).start()
118 def stop(self):
119 """Stop service."""
120 self.shutdown()
122 def wait(self):
123 """Wait for service to complete."""
125 def reset(self):
126 """Reset service.
128 Called in case service running in daemon mode receives SIGHUP.
129 """