Coverage for watcher/db/sqlalchemy/job_store.py: 45%
54 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright (c) 2017 Servionica LTD
3#
4# Authors: Alexander Chadin <a.chadin@servionica.ru>
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
19import pickle # nosec: B403
21from apscheduler.jobstores.base import ConflictingIdError
22from apscheduler.jobstores import sqlalchemy
23from apscheduler.util import datetime_to_utc_timestamp
24from apscheduler.util import maybe_ref
25from apscheduler.util import utc_timestamp_to_datetime
26from oslo_serialization import jsonutils
28from watcher.common import context
29from watcher.common import service
30from watcher import objects
32from sqlalchemy import Table, MetaData, select, and_, null
33from sqlalchemy.exc import IntegrityError
36class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
37 """Stores jobs in a database table using SQLAlchemy.
39 The table will be created if it doesn't exist in the database.
40 Plugin alias: ``sqlalchemy``
41 :param str url: connection string
42 :param engine: an SQLAlchemy Engine to use instead of creating a new
43 one based on ``url``
44 :param str tablename: name of the table to store jobs in
45 :param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of
46 creating a new one
47 :param int pickle_protocol: pickle protocol level to use
48 (for serialization), defaults to the highest available
49 :param dict tag: tag description
50 """
52 def __init__(self, url=None, engine=None, tablename='apscheduler_jobs',
53 metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL,
54 tag=None):
55 super(WatcherJobStore, self).__init__(url, engine, tablename,
56 metadata, pickle_protocol)
57 metadata = maybe_ref(metadata) or MetaData()
58 self.jobs_t = Table(tablename, metadata, autoload_with=engine)
59 service_ident = service.ServiceHeartbeat.get_service_name()
60 self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
61 self.service_id = objects.Service.list(context=context.make_context(),
62 filters=self.tag)[0].id
64 def start(self, scheduler, alias):
65 # There should be called 'start' method of parent of SQLAlchemyJobStore
66 super(self.__class__.__bases__[0], self).start(scheduler, alias)
68 def add_job(self, job):
69 insert = self.jobs_t.insert().values(**{
70 'id': job.id,
71 'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
72 'job_state': pickle.dumps(job.__getstate__(),
73 self.pickle_protocol),
74 'service_id': self.service_id,
75 'tag': jsonutils.dumps(self.tag)
76 })
77 try:
78 with self.engine.begin() as conn:
79 conn.execute(insert)
80 except IntegrityError:
81 raise ConflictingIdError(job.id)
83 def get_all_jobs(self):
84 jobs = self._get_jobs(self.jobs_t.c.tag == jsonutils.dumps(self.tag))
85 self._fix_paused_jobs_sorting(jobs)
86 return jobs
88 def get_next_run_time(self):
89 selectable = select(self.jobs_t.c.next_run_time).\
90 where(self.jobs_t.c.next_run_time != null()).\
91 order_by(self.jobs_t.c.next_run_time).limit(1)
92 with self.engine.begin() as connection:
93 # NOTE(danms): The apscheduler implementation of this gets a
94 # decimal.Decimal back from scalar() which causes
95 # utc_timestamp_to_datetime() to choke since it is expecting a
96 # python float. Assume this is SQLAlchemy 2.0 stuff, so just
97 # coerce to a float here.
98 next_run_time = connection.execute(selectable).scalar()
99 return utc_timestamp_to_datetime(float(next_run_time)
100 if next_run_time is not None
101 else None)
103 def _get_jobs(self, *conditions):
104 jobs = []
105 conditions += (self.jobs_t.c.service_id == self.service_id,)
106 selectable = select(
107 self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag
108 ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
109 failed_job_ids = set()
110 with self.engine.begin() as conn:
111 for row in conn.execute(selectable):
112 try:
113 jobs.append(self._reconstitute_job(row.job_state))
114 except Exception:
115 self._logger.exception(
116 'Unable to restore job "%s" -- removing it', row.id)
117 failed_job_ids.add(row.id)
119 # Remove all the jobs we failed to restore
120 if failed_job_ids:
121 delete = self.jobs_t.delete().where(
122 self.jobs_t.c.id.in_(failed_job_ids))
123 self.engine.execute(delete)
125 return jobs