Coverage for watcher/decision_engine/threading.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# Copyright (c) 2019 European Organization for Nuclear Research (CERN) 

3# 

4# Authors: Corne Lukken <info@dantalion.nl> 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18 

19import copy 

20import futurist 

21from futurist import waiters 

22 

23from oslo_config import cfg 

24from oslo_log import log 

25from oslo_service import service 

26 

27CONF = cfg.CONF 

28LOG = log.getLogger(__name__) 

29 

30 

31class DecisionEngineThreadPool(object, metaclass=service.Singleton): 

32 """Singleton threadpool to submit general tasks to""" 

33 

34 def __init__(self): 

35 self.amount_workers = CONF.watcher_decision_engine.max_general_workers 

36 self._threadpool = futurist.GreenThreadPoolExecutor( 

37 max_workers=self.amount_workers) 

38 

39 def submit(self, fn, *args, **kwargs): 

40 """Will submit the job to the underlying threadpool 

41 

42 :param fn: function to execute in another thread 

43 :param args: arguments for the function 

44 :param kwargs: amount of arguments for the function 

45 :return: future to monitor progress of execution 

46 :rtype: :py:class"`futurist.GreenFuture` 

47 """ 

48 

49 return self._threadpool.submit(fn, *args, **kwargs) 

50 

51 @staticmethod 

52 def do_while_futures(futures, fn, *args, **kwargs): 

53 """Do while to execute a function upon completion from a collection 

54 

55 Will execute the specified function with its arguments when one of the 

56 futures from the passed collection finishes. Additionally, the future 

57 is passed as first argument to the function. Does not modify the passed 

58 collection of futures. 

59 

60 :param futures: list, set or dictionary of futures 

61 :type futures: list :py:class:`futurist.GreenFuture` 

62 :param fn: function to execute upon the future finishing execution 

63 :param args: arguments for the function 

64 :param kwargs: amount of arguments for the function 

65 """ 

66 

67 # shallow copy the collection to not modify it outside of this method. 

68 # shallow copy must be used because the type of collection needs to be 

69 # determined at runtime (can be both list, set and dict). 

70 futures = copy.copy(futures) 

71 

72 DecisionEngineThreadPool.do_while_futures_modify( 

73 futures, fn, *args, **kwargs) 

74 

75 @staticmethod 

76 def do_while_futures_modify(futures, fn, *args, **kwargs): 

77 """Do while to execute a function upon completion from a collection 

78 

79 Will execute the specified function with its arguments when one of the 

80 futures from the passed collection finishes. Additionally, the future 

81 is passed as first argument to the function. Modifies the collection 

82 by removing completed elements, 

83 

84 :param futures: list, set or dictionary of futures 

85 :type futures: list :py:class:`futurist.GreenFuture` 

86 :param fn: function to execute upon the future finishing execution 

87 :param args: arguments for the function 

88 :param kwargs: amount of arguments for the function 

89 """ 

90 

91 waits = waiters.wait_for_any(futures) 

92 while len(waits[0]) > 0 or len(waits[1]) > 0: 

93 for future in waiters.wait_for_any(futures)[0]: 

94 fn(future, *args, **kwargs) 

95 futures.remove(future) 

96 waits = waiters.wait_for_any(futures)