Coverage for src \ nuremics \ core \ process.py: 90%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 18:48 +0100

1from __future__ import annotations 

2 

3import json 

4import os 

5import sys 

6from pathlib import Path 

7from typing import Callable 

8 

9import attrs 

10import pandas as pd 

11from termcolor import colored 

12 

13from .utils import ( 

14 concat_lists_unique, 

15 convert_value, 

16) 

17 

18 

19@attrs.define 

20class Process: 

21 

22 name: str = attrs.field(default=None) 

23 study: str = attrs.field(default=None) 

24 df_user_params: pd.DataFrame = attrs.field(default=None) 

25 dict_user_params: dict = attrs.field(default=None) 

26 dict_user_paths: dict = attrs.field(default=None) 

27 dict_paths: dict = attrs.field(factory=dict) 

28 is_case: bool = attrs.field(default=True) 

29 params: dict = attrs.field(factory=dict) 

30 allparams: list = attrs.field(factory=list) 

31 paths: dict = attrs.field(factory=dict) 

32 allpaths: list = attrs.field(factory=list) 

33 fixed_params: list = attrs.field(default=None) 

34 variable_params: list = attrs.field(default=None) 

35 fixed_paths: list = attrs.field(default=None) 

36 variable_paths: list = attrs.field(default=None) 

37 fixed_params_proc: list = attrs.field(factory=list) 

38 variable_params_proc: list = attrs.field(factory=list) 

39 fixed_paths_proc: list = attrs.field(factory=list) 

40 variable_paths_proc: list = attrs.field(factory=list) 

41 df_params: pd.DataFrame = attrs.field(default=None) 

42 dict_inputs: dict = attrs.field(default=None) 

43 dict_hard_params: dict = attrs.field(factory=dict) 

44 output_paths: dict = attrs.field(factory=dict) 

45 overall_analysis: dict = attrs.field(factory=dict) 

46 dict_analysis: dict = attrs.field(factory=dict) 

47 required_paths: dict = attrs.field(factory=dict) 

48 silent: bool = attrs.field(default=False) 

49 index: str = attrs.field(default=None) 

50 diagram: dict = attrs.field(default={}) 

51 set_inputs: bool = attrs.field(default=False) 

52 

53 def initialize(self) -> None: 

54 

55 self.variable_params_proc = [x for x in self.variable_params if x in list(self.params.values())] 

56 self.fixed_params_proc = [x for x in self.fixed_params if x in list(self.params.values())] 

57 self.fixed_paths_proc = [x for x in self.fixed_paths if x in list(self.paths.values())] 

58 self.variable_paths_proc = [x for x in self.variable_paths if x in list(self.paths.values())] 

59 

60 # Define list with all parameters considering dependencies with previous processes 

61 self.allparams = list(self.params.values()).copy() 

62 for required_paths in list(self.required_paths.values()): 

63 for _, value in self.diagram.items(): 

64 if required_paths in value.get("output_paths"): 

65 self.allparams = concat_lists_unique( 

66 list1=list(self.params.values()), 

67 list2=value["allparams"], 

68 ) 

69 

70 # Define list with all paths considering dependencies with previous processes 

71 self.allpaths = list(self.paths.values()).copy() 

72 for required_path in list(self.required_paths.values()): 

73 for _, value in self.diagram.items(): 

74 if required_path in value.get("output_paths"): 

75 self.allpaths = concat_lists_unique( 

76 list1=list(self.paths.values()), 

77 list2=value["allpaths"], 

78 ) 

79 

80 if self.is_case: 

81 self.on_params_update() 

82 

83 def __call__(self) -> None: 

84 

85 # Update dictionary of parameters 

86 if not self.set_inputs: 

87 self.update_dict_inputs() 

88 

89 for param, value in self.dict_inputs.items(): 

90 setattr(self, param, value) 

91 

92 # Printing 

93 print(colored(f"> {param} = {value}", "blue")) 

94 

95 # Printing 

96 print(colored(">>> START", "green")) 

97 

98 def on_params_update(self) -> None: 

99 

100 # Create parameters dataframe and fill with variable parameters 

101 if (len(self.variable_params_proc) > 0) or (len(self.variable_paths_proc) > 0): 

102 self.df_params = self.df_user_params[self.variable_params_proc].copy() 

103 

104 # There is no variable parameters / paths 

105 else: 

106 

107 # Check parameters / paths dependencies 

108 variable_params = [x for x in self.variable_params if x in self.allparams] 

109 variable_paths = [x for x in self.variable_paths if x in self.allpaths] 

110 

111 # There are variable parameters / paths from previous process 

112 if (len(variable_params) > 0) or (len(variable_paths) > 0): 

113 self.df_params = pd.DataFrame(self.df_user_params.index, columns=["ID"]).set_index("ID") 

114 # There is no variable parameter from previous process 

115 else: 

116 self.is_case = False 

117 

118 # Add fixed parameters to the dataframe 

119 if self.is_case: 

120 for param in self.fixed_params_proc: 

121 self.df_params[param] = self.dict_user_params[param] 

122 

123 def update_dict_inputs(self) -> None: 

124 

125 # Add user parameters 

126 if self.is_case: 

127 

128 self.dict_inputs = {} 

129 params_inv = {v: k for k, v in self.params.items()} 

130 for param in self.df_params.columns: 

131 value = convert_value(self.df_params.at[self.index, param]) 

132 self.dict_inputs[params_inv[param]] = value 

133 else: 

134 self.dict_inputs = {k: self.dict_user_params[v] for k, v in self.params.items()} 

135 

136 # Add hard parameters 

137 for param, value in self.dict_hard_params.items(): 

138 self.dict_inputs[param] = value 

139 

140 # Add user paths 

141 paths_inv = {v: k for k, v in self.paths.items()} 

142 for file in self.fixed_paths_proc: 

143 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file] 

144 for file in self.variable_paths_proc: 

145 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file][self.index] 

146 

147 # Add previous output paths 

148 for key, value in self.required_paths.items(): 

149 output_path = self.get_output_path(value) 

150 self.dict_inputs[key] = output_path 

151 

152 # Add output analysis 

153 for out, value in self.overall_analysis.items(): 

154 self.dict_inputs[out] = value 

155 

156 # Add output paths 

157 for out, value in self.output_paths.items(): 

158 self.dict_inputs[out] = value 

159 

160 # Write json file containing all parameters 

161 with open("inputs.json", "w") as f: 

162 json.dump(self.dict_inputs, f, indent=4) 

163 

164 def get_output_path(self, 

165 output_path: str, 

166 ) -> Path: 

167 

168 # Initialize path to return 

169 path = None 

170 

171 if isinstance(self.dict_paths[output_path], dict): 

172 for key, value in self.dict_paths[output_path].items(): 

173 if key == self.index: 

174 path = value 

175 else: 

176 path = self.dict_paths[output_path] 

177 

178 if not Path(path).exists(): 

179 

180 # Printing 

181 print() 

182 print(colored(f"(X) Required {output_path} is missing :", "red")) 

183 print(colored("> Please execute the necessary previous process that will build it.", "red")) 

184 

185 sys.exit(1) 

186 

187 return path 

188 

189 def update_output(self, 

190 output_path: str, 

191 dump: str, 

192 ) -> None: 

193 

194 if output_path not in self.dict_paths: 

195 self.dict_paths[output_path] = None 

196 

197 if self.is_case: 

198 if self.dict_paths[output_path] is None: 

199 self.dict_paths[output_path] = {} 

200 self.dict_paths[output_path][self.index] = os.path.join(os.getcwd(), dump) 

201 else: 

202 self.dict_paths[output_path] = os.path.join(os.getcwd(), dump) 

203 

204 @staticmethod 

205 def analysis_function( 

206 func: Callable, 

207 ) -> Callable: 

208 

209 func._is_analysis = True 

210 return func 

211 

212 def process_output(self, 

213 out: str, 

214 func: Callable[..., None], 

215 **kwargs: object, 

216 ) -> None: 

217 

218 if not getattr(func, "_is_analysis", False): 

219 print(colored(f'(X) Function "{func.__name__}" is not a valid analysis function.', "red")) 

220 sys.exit(1) 

221 

222 output = self.dict_paths[out] 

223 analysis = self.dict_analysis[self.name] 

224 if isinstance(output, dict): 

225 func(output, analysis, **kwargs) 

226 

227 def finalize(self) -> None: 

228 

229 for _, value in self.output_paths.items(): 

230 self.update_output( 

231 output_path=value, 

232 dump=value, 

233 ) 

234 

235 print(colored("COMPLETED <<<", "green"))