Coverage for watcher/decision_engine/messaging/data_model_endpoint.py: 84%

30 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright 2019 ZTE corporation. 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

14# implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18from watcher.common import exception 

19from watcher.common import utils 

20from watcher.decision_engine.model.collector import manager 

21from watcher import objects 

22 

23 

24class DataModelEndpoint(object): 

25 def __init__(self, messaging): 

26 self._messaging = messaging 

27 

28 def get_audit_scope(self, context, audit=None): 

29 scope = None 

30 try: 

31 if utils.is_uuid_like(audit) or utils.is_int_like(audit): 

32 audit = objects.Audit.get( 

33 context, audit) 

34 else: 

35 audit = objects.Audit.get_by_name( 

36 context, audit) 

37 except exception.AuditNotFound: 

38 raise exception.InvalidIdentity(identity=audit) 

39 if audit: 39 ↛ 42line 39 didn't jump to line 42 because the condition on line 39 was always true

40 scope = audit.scope 

41 else: 

42 scope = [] 

43 return scope 

44 

45 def get_data_model_info(self, context, data_model_type='compute', 

46 audit=None): 

47 if audit is not None: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 scope = self.get_audit_scope(context, audit) 

49 else: 

50 scope = [] 

51 collector_manager = manager.CollectorManager() 

52 collector = collector_manager.get_cluster_model_collector( 

53 data_model_type) 

54 audit_scope_handler = collector.get_audit_scope_handler( 

55 audit_scope=scope) 

56 available_data_model = audit_scope_handler.get_scoped_model( 

57 collector.get_latest_cluster_data_model()) 

58 if not available_data_model: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 return {"context": []} 

60 return {"context": available_data_model.to_list()}