Coverage for watcher/decision_engine/audit/continuous.py: 82%
121 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2016 Servionica LTD
3# Copyright (c) 2016 Intel Corp
4#
5# Authors: Alexander Chadin <a.chadin@servionica.ru>
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16# implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
21import datetime
23from croniter import croniter
24from dateutil import tz
25from oslo_utils import timeutils
27from watcher.common import context
28from watcher.common import scheduling
29from watcher.common import utils
30from watcher import conf
31from watcher.db.sqlalchemy import api as sq_api
32from watcher.db.sqlalchemy import job_store
33from watcher.decision_engine.audit import base
34from watcher import objects
37CONF = conf.CONF
40class ContinuousAuditHandler(base.AuditHandler):
41 def __init__(self):
42 super(ContinuousAuditHandler, self).__init__()
43 # scheduler for executing audits
44 self._audit_scheduler = None
45 # scheduler for a periodic task to launch audit
46 self._period_scheduler = None
47 self.context_show_deleted = context.RequestContext(is_admin=True,
48 show_deleted=True)
50 @property
51 def scheduler(self):
52 if self._audit_scheduler is None:
53 self._audit_scheduler = scheduling.BackgroundSchedulerService(
54 jobstores={
55 'default': job_store.WatcherJobStore(
56 engine=sq_api.enginefacade.writer.get_engine()),
57 }
58 )
59 return self._audit_scheduler
61 @property
62 def period_scheduler(self):
63 if self._period_scheduler is None:
64 self._period_scheduler = scheduling.BackgroundSchedulerService()
65 return self._period_scheduler
67 def _is_audit_inactive(self, audit):
68 audit = objects.Audit.get_by_uuid(
69 self.context_show_deleted, audit.uuid, eager=True)
70 if (objects.audit.AuditStateTransitionManager().is_inactive(audit) or
71 (audit.hostname != CONF.host) or
72 (self.check_audit_expired(audit))):
73 # if audit isn't in active states, audit's job must be removed to
74 # prevent using of inactive audit in future.
75 jobs = [job for job in self.scheduler.get_jobs()
76 if job.name == 'execute_audit' and
77 job.args[0].uuid == audit.uuid]
78 if jobs:
79 jobs[0].remove()
80 return True
82 return False
84 def do_execute(self, audit, request_context):
85 solution = super(ContinuousAuditHandler, self)\
86 .do_execute(audit, request_context)
88 if audit.audit_type == objects.audit.AuditType.CONTINUOUS.value: 88 ↛ 96line 88 didn't jump to line 96 because the condition on line 88 was always true
89 a_plan_filters = {'audit_uuid': audit.uuid,
90 'state': objects.action_plan.State.RECOMMENDED}
91 action_plans = objects.ActionPlan.list(
92 request_context, filters=a_plan_filters, eager=True)
93 for plan in action_plans: 93 ↛ 94line 93 didn't jump to line 94 because the loop on line 93 never started
94 plan.state = objects.action_plan.State.CANCELLED
95 plan.save()
96 return solution
98 @staticmethod
99 def _next_cron_time(audit):
100 if utils.is_cron_like(audit.interval): 100 ↛ exitline 100 didn't return from function '_next_cron_time' because the condition on line 100 was always true
101 return croniter(audit.interval, timeutils.utcnow()
102 ).get_next(datetime.datetime)
104 @classmethod
105 def execute_audit(cls, audit, request_context):
106 self = cls()
107 if not self._is_audit_inactive(audit):
108 try:
109 self.execute(audit, request_context)
110 except Exception:
111 raise
112 finally:
113 if utils.is_int_like(audit.interval): 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true
114 audit.next_run_time = (
115 timeutils.utcnow() +
116 datetime.timedelta(seconds=int(audit.interval)))
117 else:
118 audit.next_run_time = self._next_cron_time(audit)
119 audit.save()
121 def _add_job(self, trigger, audit, audit_context, **trigger_args):
122 time_var = 'next_run_time' if trigger_args.get(
123 'next_run_time') else 'run_date'
124 # We should convert UTC time to local time without tzinfo
125 trigger_args[time_var] = trigger_args[time_var].replace(
126 tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None)
127 self.scheduler.add_job(self.execute_audit, trigger,
128 args=[audit, audit_context],
129 name='execute_audit',
130 **trigger_args)
132 def check_audit_expired(self, audit):
133 current = timeutils.utcnow()
134 # Note: if audit still didn't get into the timeframe,
135 # skip it
136 if audit.start_time and audit.start_time > current:
137 return True
138 if audit.end_time and audit.end_time < current:
139 if audit.state != objects.audit.State.SUCCEEDED: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 audit.state = objects.audit.State.SUCCEEDED
141 audit.save()
142 return True
144 return False
146 def launch_audits_periodically(self):
147 # if audit scheduler stop, restart it
148 if not self.scheduler.running:
149 self.scheduler.start()
151 audit_context = context.RequestContext(is_admin=True)
152 audit_filters = {
153 'audit_type': objects.audit.AuditType.CONTINUOUS.value,
154 'state__in': (objects.audit.State.PENDING,
155 objects.audit.State.ONGOING),
156 }
157 audit_filters['hostname'] = None
158 unscheduled_audits = objects.Audit.list(
159 audit_context, filters=audit_filters, eager=True)
160 for audit in unscheduled_audits:
161 # If continuous audit doesn't have a hostname yet,
162 # Watcher will set current CONF.host value.
163 # TODO(alexchadin): Add scheduling of new continuous audits.
164 audit.hostname = CONF.host
165 audit.save()
166 scheduler_job_args = [
167 (job.args[0].uuid, job) for job
168 in self.scheduler.get_jobs()
169 if job.name == 'execute_audit']
170 scheduler_jobs = dict(scheduler_job_args)
171 # if audit isn't in active states, audit's job should be removed
172 jobs_to_remove = []
173 for job in scheduler_jobs.values():
174 if self._is_audit_inactive(job.args[0]): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 jobs_to_remove.append(job.args[0].uuid)
176 for audit_uuid in jobs_to_remove: 176 ↛ 177line 176 didn't jump to line 177 because the loop on line 176 never started
177 scheduler_jobs.pop(audit_uuid)
178 audit_filters['hostname'] = CONF.host
179 audits = objects.Audit.list(
180 audit_context, filters=audit_filters, eager=True)
181 for audit in audits:
182 if self.check_audit_expired(audit): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 continue
184 existing_job = scheduler_jobs.get(audit.uuid, None)
185 # if audit is not presented in scheduled audits yet,
186 # just add a new audit job.
187 # if audit is already in the job queue, and interval has changed,
188 # we need to remove the old job and add a new one.
189 if (existing_job is None) or ( 189 ↛ 181line 189 didn't jump to line 181 because the condition on line 189 was always true
190 existing_job and
191 audit.interval != existing_job.args[0].interval):
192 if existing_job:
193 self.scheduler.remove_job(existing_job.id)
194 # if interval is provided with seconds
195 if utils.is_int_like(audit.interval): 195 ↛ 216line 195 didn't jump to line 216 because the condition on line 195 was always true
196 # if audit has already been provided and we need
197 # to restore it after shutdown
198 if audit.next_run_time is not None:
199 old_run_time = audit.next_run_time
200 current = timeutils.utcnow()
201 if old_run_time < current: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 delta = datetime.timedelta(
203 seconds=(int(audit.interval) - (
204 current - old_run_time).seconds %
205 int(audit.interval)))
206 audit.next_run_time = current + delta
207 next_run_time = audit.next_run_time
208 # if audit is new one
209 else:
210 next_run_time = timeutils.utcnow()
211 self._add_job('interval', audit, audit_context,
212 seconds=int(audit.interval),
213 next_run_time=next_run_time)
215 else:
216 audit.next_run_time = self._next_cron_time(audit)
217 self._add_job('date', audit, audit_context,
218 run_date=audit.next_run_time)
219 audit.hostname = CONF.host
220 audit.save()
222 def start(self):
223 self.period_scheduler.add_job(
224 self.launch_audits_periodically,
225 'interval',
226 seconds=CONF.watcher_decision_engine.continuous_audit_interval,
227 next_run_time=datetime.datetime.now())
228 self.period_scheduler.start()
229 # audit scheduler start
230 self.scheduler.start()