Coverage for watcher/common/rpc.py: 79%

70 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# Copyright 2014 Red Hat, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from oslo_config import cfg 

17from oslo_log import log 

18import oslo_messaging as messaging 

19 

20from oslo_messaging.rpc import dispatcher 

21 

22from watcher.common import context as watcher_context 

23from watcher.common import exception 

24 

25__all__ = [ 

26 'init', 

27 'cleanup', 

28 'set_defaults', 

29 'add_extra_exmods', 

30 'clear_extra_exmods', 

31 'get_allowed_exmods', 

32 'RequestContextSerializer', 

33 'get_client', 

34 'get_server', 

35 'get_notifier', 

36] 

37 

38CONF = cfg.CONF 

39LOG = log.getLogger(__name__) 

40TRANSPORT = None 

41NOTIFICATION_TRANSPORT = None 

42NOTIFIER = None 

43 

44ALLOWED_EXMODS = [ 

45 exception.__name__, 

46] 

47EXTRA_EXMODS = [] 

48 

49 

50JsonPayloadSerializer = messaging.JsonPayloadSerializer 

51 

52 

53def init(conf): 

54 global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER 

55 exmods = get_allowed_exmods() 

56 TRANSPORT = messaging.get_rpc_transport( 

57 conf, allowed_remote_exmods=exmods) 

58 NOTIFICATION_TRANSPORT = messaging.get_notification_transport( 

59 conf, allowed_remote_exmods=exmods) 

60 

61 serializer = RequestContextSerializer(JsonPayloadSerializer()) 

62 if not conf.notification_level: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 NOTIFIER = messaging.Notifier( 

64 NOTIFICATION_TRANSPORT, serializer=serializer, driver='noop') 

65 else: 

66 NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, 

67 serializer=serializer) 

68 

69 

70def initialized(): 

71 return None not in [TRANSPORT, NOTIFIER] 

72 

73 

74def cleanup(): 

75 global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER 

76 if NOTIFIER is None: 

77 LOG.exception("RPC cleanup: NOTIFIER is None") 

78 TRANSPORT.cleanup() 

79 NOTIFICATION_TRANSPORT.cleanup() 

80 TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None 

81 

82 

83def set_defaults(control_exchange): 

84 messaging.set_transport_defaults(control_exchange) 

85 

86 

87def add_extra_exmods(*args): 

88 EXTRA_EXMODS.extend(args) 

89 

90 

91def clear_extra_exmods(): 

92 del EXTRA_EXMODS[:] 

93 

94 

95def get_allowed_exmods(): 

96 return ALLOWED_EXMODS + EXTRA_EXMODS 

97 

98 

99class RequestContextSerializer(messaging.Serializer): 

100 

101 def __init__(self, base): 

102 self._base = base 

103 

104 def serialize_entity(self, context, entity): 

105 if not self._base: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 return entity 

107 return self._base.serialize_entity(context, entity) 

108 

109 def deserialize_entity(self, context, entity): 

110 if not self._base: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 return entity 

112 return self._base.deserialize_entity(context, entity) 

113 

114 def serialize_context(self, context): 

115 return context.to_dict() 

116 

117 def deserialize_context(self, context): 

118 return watcher_context.RequestContext.from_dict(context) 

119 

120 

121def get_client(target, version_cap=None, serializer=None): 

122 assert TRANSPORT is not None 

123 serializer = RequestContextSerializer(serializer) 

124 return messaging.get_rpc_client( 

125 TRANSPORT, 

126 target, 

127 version_cap=version_cap, 

128 serializer=serializer 

129 ) 

130 

131 

132def get_server(target, endpoints, serializer=None): 

133 assert TRANSPORT is not None 

134 access_policy = dispatcher.DefaultRPCAccessPolicy 

135 serializer = RequestContextSerializer(serializer) 

136 return messaging.get_rpc_server( 

137 TRANSPORT, 

138 target, 

139 endpoints, 

140 serializer=serializer, 

141 access_policy=access_policy 

142 ) 

143 

144 

145def get_notification_listener(targets, endpoints, serializer=None, pool=None): 

146 assert NOTIFICATION_TRANSPORT is not None 

147 serializer = RequestContextSerializer(serializer) 

148 return messaging.get_notification_listener( 

149 NOTIFICATION_TRANSPORT, 

150 targets, 

151 endpoints, 

152 allow_requeue=False, 

153 pool=pool, 

154 serializer=serializer 

155 ) 

156 

157 

158def get_notifier(publisher_id): 

159 assert NOTIFIER is not None 

160 return NOTIFIER.prepare(publisher_id=publisher_id)