Coverage for watcher/common/service.py: 93%
151 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2#
3# Copyright © 2012 eNovance <licensing@enovance.com>
4##
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17import datetime
18import socket
20from oslo_concurrency import processutils
21from oslo_config import cfg
22from oslo_log import _options
23from oslo_log import log
24import oslo_messaging as messaging
25from oslo_reports import guru_meditation_report as gmr
26from oslo_reports import opts as gmr_opts
27from oslo_service import service
28from oslo_service import wsgi
29from oslo_utils import timeutils
31from watcher._i18n import _
32from watcher.api import app
33from watcher.common import config
34from watcher.common import context
35from watcher.common import rpc
36from watcher.common import scheduling
37from watcher.conf import plugins as plugins_conf
38from watcher import objects
39from watcher.objects import base
40from watcher.objects import fields as wfields
41from watcher import version
44NOTIFICATION_OPTS = [
45 cfg.StrOpt('notification_level',
46 choices=[''] + list(wfields.NotificationPriority.ALL),
47 default=wfields.NotificationPriority.INFO,
48 help=_('Specifies the minimum level for which to send '
49 'notifications. If not set, no notifications will '
50 'be sent. The default is for this option to be at the '
51 '`INFO` level.'))
52]
53cfg.CONF.register_opts(NOTIFICATION_OPTS)
56CONF = cfg.CONF
57LOG = log.getLogger(__name__)
59_DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO',
60 'oslo.messaging=INFO', 'sqlalchemy=WARN',
61 'keystoneclient=INFO', 'stevedore=INFO',
62 'eventlet.wsgi.server=WARN', 'iso8601=WARN',
63 'requests=WARN', 'neutronclient=WARN',
64 'glanceclient=WARN',
65 'apscheduler=WARN']
67Singleton = service.Singleton
70class WSGIService(service.ServiceBase):
71 """Provides ability to launch Watcher API from wsgi app."""
73 def __init__(self, service_name, use_ssl=False):
74 """Initialize, but do not start the WSGI server.
76 :param service_name: The service name of the WSGI server.
77 :param use_ssl: Wraps the socket in an SSL context if True.
78 """
79 self.service_name = service_name
80 self.app = app.VersionSelectorApplication()
81 self.workers = (CONF.api.workers or
82 processutils.get_worker_count())
83 self.server = wsgi.Server(CONF, self.service_name, self.app,
84 host=CONF.api.host,
85 port=CONF.api.port,
86 use_ssl=use_ssl,
87 logger_name=self.service_name)
89 def start(self):
90 """Start serving this service using loaded configuration"""
91 self.server.start()
93 def stop(self):
94 """Stop serving this API"""
95 self.server.stop()
97 def wait(self):
98 """Wait for the service to stop serving this API"""
99 self.server.wait()
101 def reset(self):
102 """Reset server greenpool size to default"""
103 self.server.reset()
106class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
108 service_name = None
110 def __init__(self, gconfig=None, service_name=None, **kwargs):
111 gconfig = None or {}
112 super(ServiceHeartbeat, self).__init__(gconfig, **kwargs)
113 ServiceHeartbeat.service_name = service_name
114 self.context = context.make_context()
115 self.send_beat()
117 def send_beat(self):
118 host = CONF.host
119 watcher_list = objects.Service.list(
120 self.context, filters={'name': ServiceHeartbeat.service_name,
121 'host': host})
122 if watcher_list:
123 watcher_service = watcher_list[0]
124 watcher_service.last_seen_up = timeutils.utcnow()
125 watcher_service.save()
126 else:
127 watcher_service = objects.Service(self.context)
128 watcher_service.name = ServiceHeartbeat.service_name
129 watcher_service.host = host
130 watcher_service.create()
132 def add_heartbeat_job(self):
133 self.add_job(self.send_beat, 'interval', seconds=60,
134 next_run_time=datetime.datetime.now())
136 @classmethod
137 def get_service_name(cls):
138 return CONF.host, cls.service_name
140 def start(self):
141 """Start service."""
142 self.add_heartbeat_job()
143 super(ServiceHeartbeat, self).start()
145 def stop(self):
146 """Stop service."""
147 self.shutdown()
149 def wait(self):
150 """Wait for service to complete."""
152 def reset(self):
153 """Reset service.
155 Called in case service running in daemon mode receives SIGHUP.
156 """
159class Service(service.ServiceBase):
161 API_VERSION = '1.0'
163 def __init__(self, manager_class):
164 super(Service, self).__init__()
165 self.manager = manager_class()
167 self.publisher_id = self.manager.publisher_id
168 self.api_version = self.manager.api_version
170 self.conductor_topic = self.manager.conductor_topic
171 self.notification_topics = self.manager.notification_topics
173 self.heartbeat = None
175 self.service_name = self.manager.service_name
176 if self.service_name:
177 self.heartbeat = ServiceHeartbeat(
178 service_name=self.manager.service_name)
180 self.conductor_endpoints = [
181 ep(self) for ep in self.manager.conductor_endpoints
182 ]
183 self.notification_endpoints = self.manager.notification_endpoints
185 self._conductor_client = None
187 self.conductor_topic_handler = None
188 self.notification_handler = None
190 if self.conductor_topic and self.conductor_endpoints:
191 self.conductor_topic_handler = self.build_topic_handler(
192 self.conductor_topic, self.conductor_endpoints)
193 if self.notification_topics and self.notification_endpoints:
194 self.notification_handler = self.build_notification_handler(
195 self.notification_topics, self.notification_endpoints
196 )
198 @property
199 def conductor_client(self):
200 if self._conductor_client is None:
201 target = messaging.Target(
202 topic=self.conductor_topic,
203 version=self.API_VERSION,
204 )
205 self._conductor_client = rpc.get_client(
206 target,
207 serializer=base.WatcherObjectSerializer()
208 )
209 return self._conductor_client
211 @conductor_client.setter
212 def conductor_client(self, c):
213 self.conductor_client = c
215 def build_topic_handler(self, topic_name, endpoints=()):
216 target = messaging.Target(
217 topic=topic_name,
218 # For compatibility, we can override it with 'host' opt
219 server=CONF.host or socket.gethostname(),
220 version=self.api_version,
221 )
222 return rpc.get_server(
223 target, endpoints,
224 serializer=rpc.JsonPayloadSerializer()
225 )
227 def build_notification_handler(self, topic_names, endpoints=()):
228 targets = []
229 for topic in topic_names:
230 kwargs = {}
231 if '.' in topic:
232 exchange, topic = topic.split('.')
233 kwargs['exchange'] = exchange
234 kwargs['topic'] = topic
235 targets.append(messaging.Target(**kwargs))
237 return rpc.get_notification_listener(
238 targets, endpoints,
239 serializer=rpc.JsonPayloadSerializer(),
240 pool=CONF.host
241 )
243 def start(self):
244 LOG.debug("Connecting to '%s'", CONF.transport_url)
245 if self.conductor_topic_handler: 245 ↛ 247line 245 didn't jump to line 247 because the condition on line 245 was always true
246 self.conductor_topic_handler.start()
247 if self.notification_handler: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 self.notification_handler.start()
249 if self.heartbeat: 249 ↛ exitline 249 didn't return from function 'start' because the condition on line 249 was always true
250 self.heartbeat.start()
252 def stop(self):
253 LOG.debug("Disconnecting from '%s'", CONF.transport_url)
254 if self.conductor_topic_handler: 254 ↛ 256line 254 didn't jump to line 256 because the condition on line 254 was always true
255 self.conductor_topic_handler.stop()
256 if self.notification_handler: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 self.notification_handler.stop()
258 if self.heartbeat: 258 ↛ exitline 258 didn't return from function 'stop' because the condition on line 258 was always true
259 self.heartbeat.stop()
261 def reset(self):
262 """Reset a service in case it received a SIGHUP."""
264 def wait(self):
265 """Wait for service to complete."""
267 def check_api_version(self, ctx):
268 api_manager_version = self.conductor_client.call(
269 ctx, 'check_api_version', api_version=self.api_version)
270 return api_manager_version
273def launch(conf, service_, workers=1, restart_method='mutate'):
274 return service.launch(conf, service_, workers, restart_method)
277def prepare_service(argv=(), conf=cfg.CONF):
278 log.register_options(conf)
279 gmr_opts.set_defaults(conf)
281 config.parse_args(argv)
282 cfg.set_defaults(_options.log_opts,
283 default_log_levels=_DEFAULT_LOG_LEVELS)
284 log.setup(conf, 'python-watcher')
285 conf.log_opt_values(LOG, log.DEBUG)
286 objects.register_all()
288 gmr.TextGuruMeditation.register_section(
289 _('Plugins'), plugins_conf.show_plugins)
290 gmr.TextGuruMeditation.setup_autorun(version, conf=conf)