Coverage for watcher/decision_engine/audit/continuous.py: 82%

121 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2016 Servionica LTD 

3# Copyright (c) 2016 Intel Corp 

4# 

5# Authors: Alexander Chadin <a.chadin@servionica.ru> 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

16# implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19 

20 

21import datetime 

22 

23from croniter import croniter 

24from dateutil import tz 

25from oslo_utils import timeutils 

26 

27from watcher.common import context 

28from watcher.common import scheduling 

29from watcher.common import utils 

30from watcher import conf 

31from watcher.db.sqlalchemy import api as sq_api 

32from watcher.db.sqlalchemy import job_store 

33from watcher.decision_engine.audit import base 

34from watcher import objects 

35 

36 

37CONF = conf.CONF 

38 

39 

40class ContinuousAuditHandler(base.AuditHandler): 

41 def __init__(self): 

42 super(ContinuousAuditHandler, self).__init__() 

43 # scheduler for executing audits 

44 self._audit_scheduler = None 

45 # scheduler for a periodic task to launch audit 

46 self._period_scheduler = None 

47 self.context_show_deleted = context.RequestContext(is_admin=True, 

48 show_deleted=True) 

49 

50 @property 

51 def scheduler(self): 

52 if self._audit_scheduler is None: 

53 self._audit_scheduler = scheduling.BackgroundSchedulerService( 

54 jobstores={ 

55 'default': job_store.WatcherJobStore( 

56 engine=sq_api.enginefacade.writer.get_engine()), 

57 } 

58 ) 

59 return self._audit_scheduler 

60 

61 @property 

62 def period_scheduler(self): 

63 if self._period_scheduler is None: 

64 self._period_scheduler = scheduling.BackgroundSchedulerService() 

65 return self._period_scheduler 

66 

67 def _is_audit_inactive(self, audit): 

68 audit = objects.Audit.get_by_uuid( 

69 self.context_show_deleted, audit.uuid, eager=True) 

70 if (objects.audit.AuditStateTransitionManager().is_inactive(audit) or 

71 (audit.hostname != CONF.host) or 

72 (self.check_audit_expired(audit))): 

73 # if audit isn't in active states, audit's job must be removed to 

74 # prevent using of inactive audit in future. 

75 jobs = [job for job in self.scheduler.get_jobs() 

76 if job.name == 'execute_audit' and 

77 job.args[0].uuid == audit.uuid] 

78 if jobs: 

79 jobs[0].remove() 

80 return True 

81 

82 return False 

83 

84 def do_execute(self, audit, request_context): 

85 solution = super(ContinuousAuditHandler, self)\ 

86 .do_execute(audit, request_context) 

87 

88 if audit.audit_type == objects.audit.AuditType.CONTINUOUS.value: 88 ↛ 96line 88 didn't jump to line 96 because the condition on line 88 was always true

89 a_plan_filters = {'audit_uuid': audit.uuid, 

90 'state': objects.action_plan.State.RECOMMENDED} 

91 action_plans = objects.ActionPlan.list( 

92 request_context, filters=a_plan_filters, eager=True) 

93 for plan in action_plans: 93 ↛ 94line 93 didn't jump to line 94 because the loop on line 93 never started

94 plan.state = objects.action_plan.State.CANCELLED 

95 plan.save() 

96 return solution 

97 

98 @staticmethod 

99 def _next_cron_time(audit): 

100 if utils.is_cron_like(audit.interval): 100 ↛ exitline 100 didn't return from function '_next_cron_time' because the condition on line 100 was always true

101 return croniter(audit.interval, timeutils.utcnow() 

102 ).get_next(datetime.datetime) 

103 

104 @classmethod 

105 def execute_audit(cls, audit, request_context): 

106 self = cls() 

107 if not self._is_audit_inactive(audit): 

108 try: 

109 self.execute(audit, request_context) 

110 except Exception: 

111 raise 

112 finally: 

113 if utils.is_int_like(audit.interval): 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true

114 audit.next_run_time = ( 

115 timeutils.utcnow() + 

116 datetime.timedelta(seconds=int(audit.interval))) 

117 else: 

118 audit.next_run_time = self._next_cron_time(audit) 

119 audit.save() 

120 

121 def _add_job(self, trigger, audit, audit_context, **trigger_args): 

122 time_var = 'next_run_time' if trigger_args.get( 

123 'next_run_time') else 'run_date' 

124 # We should convert UTC time to local time without tzinfo 

125 trigger_args[time_var] = trigger_args[time_var].replace( 

126 tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None) 

127 self.scheduler.add_job(self.execute_audit, trigger, 

128 args=[audit, audit_context], 

129 name='execute_audit', 

130 **trigger_args) 

131 

132 def check_audit_expired(self, audit): 

133 current = timeutils.utcnow() 

134 # Note: if audit still didn't get into the timeframe, 

135 # skip it 

136 if audit.start_time and audit.start_time > current: 

137 return True 

138 if audit.end_time and audit.end_time < current: 

139 if audit.state != objects.audit.State.SUCCEEDED: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true

140 audit.state = objects.audit.State.SUCCEEDED 

141 audit.save() 

142 return True 

143 

144 return False 

145 

146 def launch_audits_periodically(self): 

147 # if audit scheduler stop, restart it 

148 if not self.scheduler.running: 

149 self.scheduler.start() 

150 

151 audit_context = context.RequestContext(is_admin=True) 

152 audit_filters = { 

153 'audit_type': objects.audit.AuditType.CONTINUOUS.value, 

154 'state__in': (objects.audit.State.PENDING, 

155 objects.audit.State.ONGOING), 

156 } 

157 audit_filters['hostname'] = None 

158 unscheduled_audits = objects.Audit.list( 

159 audit_context, filters=audit_filters, eager=True) 

160 for audit in unscheduled_audits: 

161 # If continuous audit doesn't have a hostname yet, 

162 # Watcher will set current CONF.host value. 

163 # TODO(alexchadin): Add scheduling of new continuous audits. 

164 audit.hostname = CONF.host 

165 audit.save() 

166 scheduler_job_args = [ 

167 (job.args[0].uuid, job) for job 

168 in self.scheduler.get_jobs() 

169 if job.name == 'execute_audit'] 

170 scheduler_jobs = dict(scheduler_job_args) 

171 # if audit isn't in active states, audit's job should be removed 

172 jobs_to_remove = [] 

173 for job in scheduler_jobs.values(): 

174 if self._is_audit_inactive(job.args[0]): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 jobs_to_remove.append(job.args[0].uuid) 

176 for audit_uuid in jobs_to_remove: 176 ↛ 177line 176 didn't jump to line 177 because the loop on line 176 never started

177 scheduler_jobs.pop(audit_uuid) 

178 audit_filters['hostname'] = CONF.host 

179 audits = objects.Audit.list( 

180 audit_context, filters=audit_filters, eager=True) 

181 for audit in audits: 

182 if self.check_audit_expired(audit): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 continue 

184 existing_job = scheduler_jobs.get(audit.uuid, None) 

185 # if audit is not presented in scheduled audits yet, 

186 # just add a new audit job. 

187 # if audit is already in the job queue, and interval has changed, 

188 # we need to remove the old job and add a new one. 

189 if (existing_job is None) or ( 189 ↛ 181line 189 didn't jump to line 181 because the condition on line 189 was always true

190 existing_job and 

191 audit.interval != existing_job.args[0].interval): 

192 if existing_job: 

193 self.scheduler.remove_job(existing_job.id) 

194 # if interval is provided with seconds 

195 if utils.is_int_like(audit.interval): 195 ↛ 216line 195 didn't jump to line 216 because the condition on line 195 was always true

196 # if audit has already been provided and we need 

197 # to restore it after shutdown 

198 if audit.next_run_time is not None: 

199 old_run_time = audit.next_run_time 

200 current = timeutils.utcnow() 

201 if old_run_time < current: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 delta = datetime.timedelta( 

203 seconds=(int(audit.interval) - ( 

204 current - old_run_time).seconds % 

205 int(audit.interval))) 

206 audit.next_run_time = current + delta 

207 next_run_time = audit.next_run_time 

208 # if audit is new one 

209 else: 

210 next_run_time = timeutils.utcnow() 

211 self._add_job('interval', audit, audit_context, 

212 seconds=int(audit.interval), 

213 next_run_time=next_run_time) 

214 

215 else: 

216 audit.next_run_time = self._next_cron_time(audit) 

217 self._add_job('date', audit, audit_context, 

218 run_date=audit.next_run_time) 

219 audit.hostname = CONF.host 

220 audit.save() 

221 

222 def start(self): 

223 self.period_scheduler.add_job( 

224 self.launch_audits_periodically, 

225 'interval', 

226 seconds=CONF.watcher_decision_engine.continuous_audit_interval, 

227 next_run_time=datetime.datetime.now()) 

228 self.period_scheduler.start() 

229 # audit scheduler start 

230 self.scheduler.start()