Coverage for src \ nuremics \ core \ process.py: 90%
138 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 18:48 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 18:48 +0100
1from __future__ import annotations
3import json
4import os
5import sys
6from pathlib import Path
7from typing import Callable
9import attrs
10import pandas as pd
11from termcolor import colored
13from .utils import (
14 concat_lists_unique,
15 convert_value,
16)
19@attrs.define
20class Process:
22 name: str = attrs.field(default=None)
23 study: str = attrs.field(default=None)
24 df_user_params: pd.DataFrame = attrs.field(default=None)
25 dict_user_params: dict = attrs.field(default=None)
26 dict_user_paths: dict = attrs.field(default=None)
27 dict_paths: dict = attrs.field(factory=dict)
28 is_case: bool = attrs.field(default=True)
29 params: dict = attrs.field(factory=dict)
30 allparams: list = attrs.field(factory=list)
31 paths: dict = attrs.field(factory=dict)
32 allpaths: list = attrs.field(factory=list)
33 fixed_params: list = attrs.field(default=None)
34 variable_params: list = attrs.field(default=None)
35 fixed_paths: list = attrs.field(default=None)
36 variable_paths: list = attrs.field(default=None)
37 fixed_params_proc: list = attrs.field(factory=list)
38 variable_params_proc: list = attrs.field(factory=list)
39 fixed_paths_proc: list = attrs.field(factory=list)
40 variable_paths_proc: list = attrs.field(factory=list)
41 df_params: pd.DataFrame = attrs.field(default=None)
42 dict_inputs: dict = attrs.field(default=None)
43 dict_hard_params: dict = attrs.field(factory=dict)
44 output_paths: dict = attrs.field(factory=dict)
45 overall_analysis: dict = attrs.field(factory=dict)
46 dict_analysis: dict = attrs.field(factory=dict)
47 required_paths: dict = attrs.field(factory=dict)
48 silent: bool = attrs.field(default=False)
49 index: str = attrs.field(default=None)
50 diagram: dict = attrs.field(default={})
51 set_inputs: bool = attrs.field(default=False)
53 def initialize(self) -> None:
55 self.variable_params_proc = [x for x in self.variable_params if x in list(self.params.values())]
56 self.fixed_params_proc = [x for x in self.fixed_params if x in list(self.params.values())]
57 self.fixed_paths_proc = [x for x in self.fixed_paths if x in list(self.paths.values())]
58 self.variable_paths_proc = [x for x in self.variable_paths if x in list(self.paths.values())]
60 # Define list with all parameters considering dependencies with previous processes
61 self.allparams = list(self.params.values()).copy()
62 for required_paths in list(self.required_paths.values()):
63 for _, value in self.diagram.items():
64 if required_paths in value.get("output_paths"):
65 self.allparams = concat_lists_unique(
66 list1=list(self.params.values()),
67 list2=value["allparams"],
68 )
70 # Define list with all paths considering dependencies with previous processes
71 self.allpaths = list(self.paths.values()).copy()
72 for required_path in list(self.required_paths.values()):
73 for _, value in self.diagram.items():
74 if required_path in value.get("output_paths"):
75 self.allpaths = concat_lists_unique(
76 list1=list(self.paths.values()),
77 list2=value["allpaths"],
78 )
80 if self.is_case:
81 self.on_params_update()
83 def __call__(self) -> None:
85 # Update dictionary of parameters
86 if not self.set_inputs:
87 self.update_dict_inputs()
89 for param, value in self.dict_inputs.items():
90 setattr(self, param, value)
92 # Printing
93 print(colored(f"> {param} = {value}", "blue"))
95 # Printing
96 print(colored(">>> START", "green"))
98 def on_params_update(self) -> None:
100 # Create parameters dataframe and fill with variable parameters
101 if (len(self.variable_params_proc) > 0) or (len(self.variable_paths_proc) > 0):
102 self.df_params = self.df_user_params[self.variable_params_proc].copy()
104 # There is no variable parameters / paths
105 else:
107 # Check parameters / paths dependencies
108 variable_params = [x for x in self.variable_params if x in self.allparams]
109 variable_paths = [x for x in self.variable_paths if x in self.allpaths]
111 # There are variable parameters / paths from previous process
112 if (len(variable_params) > 0) or (len(variable_paths) > 0):
113 self.df_params = pd.DataFrame(self.df_user_params.index, columns=["ID"]).set_index("ID")
114 # There is no variable parameter from previous process
115 else:
116 self.is_case = False
118 # Add fixed parameters to the dataframe
119 if self.is_case:
120 for param in self.fixed_params_proc:
121 self.df_params[param] = self.dict_user_params[param]
123 def update_dict_inputs(self) -> None:
125 # Add user parameters
126 if self.is_case:
128 self.dict_inputs = {}
129 params_inv = {v: k for k, v in self.params.items()}
130 for param in self.df_params.columns:
131 value = convert_value(self.df_params.at[self.index, param])
132 self.dict_inputs[params_inv[param]] = value
133 else:
134 self.dict_inputs = {k: self.dict_user_params[v] for k, v in self.params.items()}
136 # Add hard parameters
137 for param, value in self.dict_hard_params.items():
138 self.dict_inputs[param] = value
140 # Add user paths
141 paths_inv = {v: k for k, v in self.paths.items()}
142 for file in self.fixed_paths_proc:
143 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file]
144 for file in self.variable_paths_proc:
145 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file][self.index]
147 # Add previous output paths
148 for key, value in self.required_paths.items():
149 output_path = self.get_output_path(value)
150 self.dict_inputs[key] = output_path
152 # Add output analysis
153 for out, value in self.overall_analysis.items():
154 self.dict_inputs[out] = value
156 # Add output paths
157 for out, value in self.output_paths.items():
158 self.dict_inputs[out] = value
160 # Write json file containing all parameters
161 with open("inputs.json", "w") as f:
162 json.dump(self.dict_inputs, f, indent=4)
164 def get_output_path(self,
165 output_path: str,
166 ) -> Path:
168 # Initialize path to return
169 path = None
171 if isinstance(self.dict_paths[output_path], dict):
172 for key, value in self.dict_paths[output_path].items():
173 if key == self.index:
174 path = value
175 else:
176 path = self.dict_paths[output_path]
178 if not Path(path).exists():
180 # Printing
181 print()
182 print(colored(f"(X) Required {output_path} is missing :", "red"))
183 print(colored("> Please execute the necessary previous process that will build it.", "red"))
185 sys.exit(1)
187 return path
189 def update_output(self,
190 output_path: str,
191 dump: str,
192 ) -> None:
194 if output_path not in self.dict_paths:
195 self.dict_paths[output_path] = None
197 if self.is_case:
198 if self.dict_paths[output_path] is None:
199 self.dict_paths[output_path] = {}
200 self.dict_paths[output_path][self.index] = os.path.join(os.getcwd(), dump)
201 else:
202 self.dict_paths[output_path] = os.path.join(os.getcwd(), dump)
204 @staticmethod
205 def analysis_function(
206 func: Callable,
207 ) -> Callable:
209 func._is_analysis = True
210 return func
212 def process_output(self,
213 out: str,
214 func: Callable[..., None],
215 **kwargs: object,
216 ) -> None:
218 if not getattr(func, "_is_analysis", False):
219 print(colored(f'(X) Function "{func.__name__}" is not a valid analysis function.', "red"))
220 sys.exit(1)
222 output = self.dict_paths[out]
223 analysis = self.dict_analysis[self.name]
224 if isinstance(output, dict):
225 func(output, analysis, **kwargs)
227 def finalize(self) -> None:
229 for _, value in self.output_paths.items():
230 self.update_output(
231 output_path=value,
232 dump=value,
233 )
235 print(colored("COMPLETED <<<", "green"))