Coverage for watcher/applier/rpcapi.py: 100%

35 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2015 b<>com 

3# Copyright (c) 2016 Intel Corp 

4# 

5# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com> 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

16# implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19 

20from watcher.common import exception 

21from watcher.common import service 

22from watcher.common import service_manager 

23from watcher.common import utils 

24 

25from watcher import conf 

26 

27CONF = conf.CONF 

28 

29 

30class ApplierAPI(service.Service): 

31 

32 def __init__(self): 

33 super(ApplierAPI, self).__init__(ApplierAPIManager) 

34 

35 def launch_action_plan(self, context, action_plan_uuid=None): 

36 if not utils.is_uuid_like(action_plan_uuid): 

37 raise exception.InvalidUuidOrName(name=action_plan_uuid) 

38 

39 self.conductor_client.cast( 

40 context, 'launch_action_plan', action_plan_uuid=action_plan_uuid) 

41 

42 

43class ApplierAPIManager(service_manager.ServiceManager): 

44 

45 @property 

46 def service_name(self): 

47 return None 

48 

49 @property 

50 def api_version(self): 

51 return '1.0' 

52 

53 @property 

54 def publisher_id(self): 

55 return CONF.watcher_applier.publisher_id 

56 

57 @property 

58 def conductor_topic(self): 

59 return CONF.watcher_applier.conductor_topic 

60 

61 @property 

62 def notification_topics(self): 

63 return [] 

64 

65 @property 

66 def conductor_endpoints(self): 

67 return [] 

68 

69 @property 

70 def notification_endpoints(self): 

71 return []