Coverage for watcher/decision_engine/messaging/data_model_endpoint.py: 84%
30 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-17 12:22 +0000
1# -*- encoding: utf-8 -*-
2# Copyright 2019 ZTE corporation.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from watcher.common import exception
19from watcher.common import utils
20from watcher.decision_engine.model.collector import manager
21from watcher import objects
24class DataModelEndpoint(object):
25 def __init__(self, messaging):
26 self._messaging = messaging
28 def get_audit_scope(self, context, audit=None):
29 scope = None
30 try:
31 if utils.is_uuid_like(audit) or utils.is_int_like(audit):
32 audit = objects.Audit.get(
33 context, audit)
34 else:
35 audit = objects.Audit.get_by_name(
36 context, audit)
37 except exception.AuditNotFound:
38 raise exception.InvalidIdentity(identity=audit)
39 if audit: 39 ↛ 42line 39 didn't jump to line 42 because the condition on line 39 was always true
40 scope = audit.scope
41 else:
42 scope = []
43 return scope
45 def get_data_model_info(self, context, data_model_type='compute',
46 audit=None):
47 if audit is not None: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 scope = self.get_audit_scope(context, audit)
49 else:
50 scope = []
51 collector_manager = manager.CollectorManager()
52 collector = collector_manager.get_cluster_model_collector(
53 data_model_type)
54 audit_scope_handler = collector.get_audit_scope_handler(
55 audit_scope=scope)
56 available_data_model = audit_scope_handler.get_scoped_model(
57 collector.get_latest_cluster_data_model())
58 if not available_data_model: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 return {"context": []}
60 return {"context": available_data_model.to_list()}