Coverage for watcher/db/sqlalchemy/job_store.py: 45%

54 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2017 Servionica LTD 

3# 

4# Authors: Alexander Chadin <a.chadin@servionica.ru> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18 

19import pickle # nosec: B403 

20 

21from apscheduler.jobstores.base import ConflictingIdError 

22from apscheduler.jobstores import sqlalchemy 

23from apscheduler.util import datetime_to_utc_timestamp 

24from apscheduler.util import maybe_ref 

25from apscheduler.util import utc_timestamp_to_datetime 

26from oslo_serialization import jsonutils 

27 

28from watcher.common import context 

29from watcher.common import service 

30from watcher import objects 

31 

32from sqlalchemy import Table, MetaData, select, and_, null 

33from sqlalchemy.exc import IntegrityError 

34 

35 

36class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): 

37 """Stores jobs in a database table using SQLAlchemy. 

38 

39 The table will be created if it doesn't exist in the database. 

40 Plugin alias: ``sqlalchemy`` 

41 :param str url: connection string 

42 :param engine: an SQLAlchemy Engine to use instead of creating a new 

43 one based on ``url`` 

44 :param str tablename: name of the table to store jobs in 

45 :param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of 

46 creating a new one 

47 :param int pickle_protocol: pickle protocol level to use 

48 (for serialization), defaults to the highest available 

49 :param dict tag: tag description 

50 """ 

51 

52 def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', 

53 metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, 

54 tag=None): 

55 super(WatcherJobStore, self).__init__(url, engine, tablename, 

56 metadata, pickle_protocol) 

57 metadata = maybe_ref(metadata) or MetaData() 

58 self.jobs_t = Table(tablename, metadata, autoload_with=engine) 

59 service_ident = service.ServiceHeartbeat.get_service_name() 

60 self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} 

61 self.service_id = objects.Service.list(context=context.make_context(), 

62 filters=self.tag)[0].id 

63 

64 def start(self, scheduler, alias): 

65 # There should be called 'start' method of parent of SQLAlchemyJobStore 

66 super(self.__class__.__bases__[0], self).start(scheduler, alias) 

67 

68 def add_job(self, job): 

69 insert = self.jobs_t.insert().values(**{ 

70 'id': job.id, 

71 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 

72 'job_state': pickle.dumps(job.__getstate__(), 

73 self.pickle_protocol), 

74 'service_id': self.service_id, 

75 'tag': jsonutils.dumps(self.tag) 

76 }) 

77 try: 

78 with self.engine.begin() as conn: 

79 conn.execute(insert) 

80 except IntegrityError: 

81 raise ConflictingIdError(job.id) 

82 

83 def get_all_jobs(self): 

84 jobs = self._get_jobs(self.jobs_t.c.tag == jsonutils.dumps(self.tag)) 

85 self._fix_paused_jobs_sorting(jobs) 

86 return jobs 

87 

88 def get_next_run_time(self): 

89 selectable = select(self.jobs_t.c.next_run_time).\ 

90 where(self.jobs_t.c.next_run_time != null()).\ 

91 order_by(self.jobs_t.c.next_run_time).limit(1) 

92 with self.engine.begin() as connection: 

93 # NOTE(danms): The apscheduler implementation of this gets a 

94 # decimal.Decimal back from scalar() which causes 

95 # utc_timestamp_to_datetime() to choke since it is expecting a 

96 # python float. Assume this is SQLAlchemy 2.0 stuff, so just 

97 # coerce to a float here. 

98 next_run_time = connection.execute(selectable).scalar() 

99 return utc_timestamp_to_datetime(float(next_run_time) 

100 if next_run_time is not None 

101 else None) 

102 

103 def _get_jobs(self, *conditions): 

104 jobs = [] 

105 conditions += (self.jobs_t.c.service_id == self.service_id,) 

106 selectable = select( 

107 self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag 

108 ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) 

109 failed_job_ids = set() 

110 with self.engine.begin() as conn: 

111 for row in conn.execute(selectable): 

112 try: 

113 jobs.append(self._reconstitute_job(row.job_state)) 

114 except Exception: 

115 self._logger.exception( 

116 'Unable to restore job "%s" -- removing it', row.id) 

117 failed_job_ids.add(row.id) 

118 

119 # Remove all the jobs we failed to restore 

120 if failed_job_ids: 

121 delete = self.jobs_t.delete().where( 

122 self.jobs_t.c.id.in_(failed_job_ids)) 

123 self.engine.execute(delete) 

124 

125 return jobs