Coverage for watcher/common/service.py: 93%

151 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# 

3# Copyright © 2012 eNovance <licensing@enovance.com> 

4## 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import datetime 

18import socket 

19 

20from oslo_concurrency import processutils 

21from oslo_config import cfg 

22from oslo_log import _options 

23from oslo_log import log 

24import oslo_messaging as messaging 

25from oslo_reports import guru_meditation_report as gmr 

26from oslo_reports import opts as gmr_opts 

27from oslo_service import service 

28from oslo_service import wsgi 

29from oslo_utils import timeutils 

30 

31from watcher._i18n import _ 

32from watcher.api import app 

33from watcher.common import config 

34from watcher.common import context 

35from watcher.common import rpc 

36from watcher.common import scheduling 

37from watcher.conf import plugins as plugins_conf 

38from watcher import objects 

39from watcher.objects import base 

40from watcher.objects import fields as wfields 

41from watcher import version 

42 

43 

44NOTIFICATION_OPTS = [ 

45 cfg.StrOpt('notification_level', 

46 choices=[''] + list(wfields.NotificationPriority.ALL), 

47 default=wfields.NotificationPriority.INFO, 

48 help=_('Specifies the minimum level for which to send ' 

49 'notifications. If not set, no notifications will ' 

50 'be sent. The default is for this option to be at the ' 

51 '`INFO` level.')) 

52] 

53cfg.CONF.register_opts(NOTIFICATION_OPTS) 

54 

55 

56CONF = cfg.CONF 

57LOG = log.getLogger(__name__) 

58 

59_DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO', 

60 'oslo.messaging=INFO', 'sqlalchemy=WARN', 

61 'keystoneclient=INFO', 'stevedore=INFO', 

62 'eventlet.wsgi.server=WARN', 'iso8601=WARN', 

63 'requests=WARN', 'neutronclient=WARN', 

64 'glanceclient=WARN', 

65 'apscheduler=WARN'] 

66 

67Singleton = service.Singleton 

68 

69 

70class WSGIService(service.ServiceBase): 

71 """Provides ability to launch Watcher API from wsgi app.""" 

72 

73 def __init__(self, service_name, use_ssl=False): 

74 """Initialize, but do not start the WSGI server. 

75 

76 :param service_name: The service name of the WSGI server. 

77 :param use_ssl: Wraps the socket in an SSL context if True. 

78 """ 

79 self.service_name = service_name 

80 self.app = app.VersionSelectorApplication() 

81 self.workers = (CONF.api.workers or 

82 processutils.get_worker_count()) 

83 self.server = wsgi.Server(CONF, self.service_name, self.app, 

84 host=CONF.api.host, 

85 port=CONF.api.port, 

86 use_ssl=use_ssl, 

87 logger_name=self.service_name) 

88 

89 def start(self): 

90 """Start serving this service using loaded configuration""" 

91 self.server.start() 

92 

93 def stop(self): 

94 """Stop serving this API""" 

95 self.server.stop() 

96 

97 def wait(self): 

98 """Wait for the service to stop serving this API""" 

99 self.server.wait() 

100 

101 def reset(self): 

102 """Reset server greenpool size to default""" 

103 self.server.reset() 

104 

105 

106class ServiceHeartbeat(scheduling.BackgroundSchedulerService): 

107 

108 service_name = None 

109 

110 def __init__(self, gconfig=None, service_name=None, **kwargs): 

111 gconfig = None or {} 

112 super(ServiceHeartbeat, self).__init__(gconfig, **kwargs) 

113 ServiceHeartbeat.service_name = service_name 

114 self.context = context.make_context() 

115 self.send_beat() 

116 

117 def send_beat(self): 

118 host = CONF.host 

119 watcher_list = objects.Service.list( 

120 self.context, filters={'name': ServiceHeartbeat.service_name, 

121 'host': host}) 

122 if watcher_list: 

123 watcher_service = watcher_list[0] 

124 watcher_service.last_seen_up = timeutils.utcnow() 

125 watcher_service.save() 

126 else: 

127 watcher_service = objects.Service(self.context) 

128 watcher_service.name = ServiceHeartbeat.service_name 

129 watcher_service.host = host 

130 watcher_service.create() 

131 

132 def add_heartbeat_job(self): 

133 self.add_job(self.send_beat, 'interval', seconds=60, 

134 next_run_time=datetime.datetime.now()) 

135 

136 @classmethod 

137 def get_service_name(cls): 

138 return CONF.host, cls.service_name 

139 

140 def start(self): 

141 """Start service.""" 

142 self.add_heartbeat_job() 

143 super(ServiceHeartbeat, self).start() 

144 

145 def stop(self): 

146 """Stop service.""" 

147 self.shutdown() 

148 

149 def wait(self): 

150 """Wait for service to complete.""" 

151 

152 def reset(self): 

153 """Reset service. 

154 

155 Called in case service running in daemon mode receives SIGHUP. 

156 """ 

157 

158 

159class Service(service.ServiceBase): 

160 

161 API_VERSION = '1.0' 

162 

163 def __init__(self, manager_class): 

164 super(Service, self).__init__() 

165 self.manager = manager_class() 

166 

167 self.publisher_id = self.manager.publisher_id 

168 self.api_version = self.manager.api_version 

169 

170 self.conductor_topic = self.manager.conductor_topic 

171 self.notification_topics = self.manager.notification_topics 

172 

173 self.heartbeat = None 

174 

175 self.service_name = self.manager.service_name 

176 if self.service_name: 

177 self.heartbeat = ServiceHeartbeat( 

178 service_name=self.manager.service_name) 

179 

180 self.conductor_endpoints = [ 

181 ep(self) for ep in self.manager.conductor_endpoints 

182 ] 

183 self.notification_endpoints = self.manager.notification_endpoints 

184 

185 self._conductor_client = None 

186 

187 self.conductor_topic_handler = None 

188 self.notification_handler = None 

189 

190 if self.conductor_topic and self.conductor_endpoints: 

191 self.conductor_topic_handler = self.build_topic_handler( 

192 self.conductor_topic, self.conductor_endpoints) 

193 if self.notification_topics and self.notification_endpoints: 

194 self.notification_handler = self.build_notification_handler( 

195 self.notification_topics, self.notification_endpoints 

196 ) 

197 

198 @property 

199 def conductor_client(self): 

200 if self._conductor_client is None: 

201 target = messaging.Target( 

202 topic=self.conductor_topic, 

203 version=self.API_VERSION, 

204 ) 

205 self._conductor_client = rpc.get_client( 

206 target, 

207 serializer=base.WatcherObjectSerializer() 

208 ) 

209 return self._conductor_client 

210 

211 @conductor_client.setter 

212 def conductor_client(self, c): 

213 self.conductor_client = c 

214 

215 def build_topic_handler(self, topic_name, endpoints=()): 

216 target = messaging.Target( 

217 topic=topic_name, 

218 # For compatibility, we can override it with 'host' opt 

219 server=CONF.host or socket.gethostname(), 

220 version=self.api_version, 

221 ) 

222 return rpc.get_server( 

223 target, endpoints, 

224 serializer=rpc.JsonPayloadSerializer() 

225 ) 

226 

227 def build_notification_handler(self, topic_names, endpoints=()): 

228 targets = [] 

229 for topic in topic_names: 

230 kwargs = {} 

231 if '.' in topic: 

232 exchange, topic = topic.split('.') 

233 kwargs['exchange'] = exchange 

234 kwargs['topic'] = topic 

235 targets.append(messaging.Target(**kwargs)) 

236 

237 return rpc.get_notification_listener( 

238 targets, endpoints, 

239 serializer=rpc.JsonPayloadSerializer(), 

240 pool=CONF.host 

241 ) 

242 

243 def start(self): 

244 LOG.debug("Connecting to '%s'", CONF.transport_url) 

245 if self.conductor_topic_handler: 245 ↛ 247line 245 didn't jump to line 247 because the condition on line 245 was always true

246 self.conductor_topic_handler.start() 

247 if self.notification_handler: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 self.notification_handler.start() 

249 if self.heartbeat: 249 ↛ exitline 249 didn't return from function 'start' because the condition on line 249 was always true

250 self.heartbeat.start() 

251 

252 def stop(self): 

253 LOG.debug("Disconnecting from '%s'", CONF.transport_url) 

254 if self.conductor_topic_handler: 254 ↛ 256line 254 didn't jump to line 256 because the condition on line 254 was always true

255 self.conductor_topic_handler.stop() 

256 if self.notification_handler: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 self.notification_handler.stop() 

258 if self.heartbeat: 258 ↛ exitline 258 didn't return from function 'stop' because the condition on line 258 was always true

259 self.heartbeat.stop() 

260 

261 def reset(self): 

262 """Reset a service in case it received a SIGHUP.""" 

263 

264 def wait(self): 

265 """Wait for service to complete.""" 

266 

267 def check_api_version(self, ctx): 

268 api_manager_version = self.conductor_client.call( 

269 ctx, 'check_api_version', api_version=self.api_version) 

270 return api_manager_version 

271 

272 

273def launch(conf, service_, workers=1, restart_method='mutate'): 

274 return service.launch(conf, service_, workers, restart_method) 

275 

276 

277def prepare_service(argv=(), conf=cfg.CONF): 

278 log.register_options(conf) 

279 gmr_opts.set_defaults(conf) 

280 

281 config.parse_args(argv) 

282 cfg.set_defaults(_options.log_opts, 

283 default_log_levels=_DEFAULT_LOG_LEVELS) 

284 log.setup(conf, 'python-watcher') 

285 conf.log_opt_values(LOG, log.DEBUG) 

286 objects.register_all() 

287 

288 gmr.TextGuruMeditation.register_section( 

289 _('Plugins'), plugins_conf.show_plugins) 

290 gmr.TextGuruMeditation.setup_autorun(version, conf=conf)