Coverage for watcher/common/rpc.py: 79%
70 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# Copyright 2014 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from oslo_config import cfg
17from oslo_log import log
18import oslo_messaging as messaging
20from oslo_messaging.rpc import dispatcher
22from watcher.common import context as watcher_context
23from watcher.common import exception
25__all__ = [
26 'init',
27 'cleanup',
28 'set_defaults',
29 'add_extra_exmods',
30 'clear_extra_exmods',
31 'get_allowed_exmods',
32 'RequestContextSerializer',
33 'get_client',
34 'get_server',
35 'get_notifier',
36]
38CONF = cfg.CONF
39LOG = log.getLogger(__name__)
40TRANSPORT = None
41NOTIFICATION_TRANSPORT = None
42NOTIFIER = None
44ALLOWED_EXMODS = [
45 exception.__name__,
46]
47EXTRA_EXMODS = []
50JsonPayloadSerializer = messaging.JsonPayloadSerializer
53def init(conf):
54 global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
55 exmods = get_allowed_exmods()
56 TRANSPORT = messaging.get_rpc_transport(
57 conf, allowed_remote_exmods=exmods)
58 NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
59 conf, allowed_remote_exmods=exmods)
61 serializer = RequestContextSerializer(JsonPayloadSerializer())
62 if not conf.notification_level: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 NOTIFIER = messaging.Notifier(
64 NOTIFICATION_TRANSPORT, serializer=serializer, driver='noop')
65 else:
66 NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
67 serializer=serializer)
70def initialized():
71 return None not in [TRANSPORT, NOTIFIER]
74def cleanup():
75 global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
76 if NOTIFIER is None:
77 LOG.exception("RPC cleanup: NOTIFIER is None")
78 TRANSPORT.cleanup()
79 NOTIFICATION_TRANSPORT.cleanup()
80 TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
83def set_defaults(control_exchange):
84 messaging.set_transport_defaults(control_exchange)
87def add_extra_exmods(*args):
88 EXTRA_EXMODS.extend(args)
91def clear_extra_exmods():
92 del EXTRA_EXMODS[:]
95def get_allowed_exmods():
96 return ALLOWED_EXMODS + EXTRA_EXMODS
99class RequestContextSerializer(messaging.Serializer):
101 def __init__(self, base):
102 self._base = base
104 def serialize_entity(self, context, entity):
105 if not self._base: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 return entity
107 return self._base.serialize_entity(context, entity)
109 def deserialize_entity(self, context, entity):
110 if not self._base: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 return entity
112 return self._base.deserialize_entity(context, entity)
114 def serialize_context(self, context):
115 return context.to_dict()
117 def deserialize_context(self, context):
118 return watcher_context.RequestContext.from_dict(context)
121def get_client(target, version_cap=None, serializer=None):
122 assert TRANSPORT is not None
123 serializer = RequestContextSerializer(serializer)
124 return messaging.get_rpc_client(
125 TRANSPORT,
126 target,
127 version_cap=version_cap,
128 serializer=serializer
129 )
132def get_server(target, endpoints, serializer=None):
133 assert TRANSPORT is not None
134 access_policy = dispatcher.DefaultRPCAccessPolicy
135 serializer = RequestContextSerializer(serializer)
136 return messaging.get_rpc_server(
137 TRANSPORT,
138 target,
139 endpoints,
140 serializer=serializer,
141 access_policy=access_policy
142 )
145def get_notification_listener(targets, endpoints, serializer=None, pool=None):
146 assert NOTIFICATION_TRANSPORT is not None
147 serializer = RequestContextSerializer(serializer)
148 return messaging.get_notification_listener(
149 NOTIFICATION_TRANSPORT,
150 targets,
151 endpoints,
152 allow_requeue=False,
153 pool=pool,
154 serializer=serializer
155 )
158def get_notifier(publisher_id):
159 assert NOTIFIER is not None
160 return NOTIFIER.prepare(publisher_id=publisher_id)