Coverage for src \ nuremics \ core \ workflow.py: 81%
939 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 18:48 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 18:48 +0100
1from __future__ import annotations
3import json
4import os
5import pathlib
6import shutil
7import sys
8from importlib.resources import files
9from pathlib import Path
11import numpy as np
12import pandas as pd
13from termcolor import colored
15from .process import Process
16from .utils import (
17 extract_analysis,
18 extract_inputs_and_types,
19 extract_outputs,
20 get_self_method_calls,
21 only_function_calls,
22)
25class WorkFlow:
27 def __init__(
28 self,
29 app_name: str,
30 config_path: Path,
31 workflow: list,
32 silent: bool = False,
33 ) -> None:
35 # -------------------- #
36 # Initialize variables #
37 # -------------------- #
38 self.app_name = app_name
39 self.config_path = config_path
40 self.list_workflow = workflow
41 self.list_processes = []
42 self.dict_inputs = {}
43 self.dict_datasets = {}
44 self.dict_studies = {}
45 self.dict_process = {}
46 self.dict_analysis = {}
47 self.user_params = []
48 self.user_paths = []
49 self.output_paths = []
50 self.overall_analysis = []
51 self.params_type = {}
52 self.operations_by_process = {}
53 self.inputs_by_process = {}
54 self.params_by_process = {}
55 self.paths_by_process = {}
56 self.outputs_by_process = {}
57 self.analysis_by_process = {}
58 self.settings_by_process = {}
59 self.params_plug = {}
60 self.paths_plug = {}
61 self.outputs_plug = {}
62 self.analysis_plug = {}
63 self.studies_modif = {}
64 self.studies_messages = {}
65 self.studies_config = {}
66 self.fixed_params_messages = {}
67 self.fixed_params_config = {}
68 self.fixed_paths_messages = {}
69 self.fixed_paths_config = {}
70 self.variable_params_messages = {}
71 self.variable_params_config = {}
72 self.variable_paths_messages = {}
73 self.variable_paths_config = {}
74 self.fixed_params = {}
75 self.fixed_paths = {}
76 self.variable_params = {}
77 self.variable_paths = {}
78 self.dict_fixed_params = {}
79 self.dict_variable_params = {}
80 self.dict_user_paths = {}
81 self.dict_paths = {}
82 self.diagram = {}
83 self.silent = silent
85 # ------------------------------------ #
86 # Define and create nuremics directory #
87 # ------------------------------------ #
88 self.config_path.mkdir(
89 exist_ok=True,
90 parents=True,
91 )
93 # -------------------- #
94 # Create settings file #
95 # -------------------- #
96 settings_file = self.config_path / "settings.json"
97 if not settings_file.exists():
98 dict_settings = {
99 "default_working_dir": None,
100 "apps": {},
101 }
102 with open(settings_file, "w") as f:
103 json.dump(dict_settings, f, indent=4)
105 # -------------------------- #
106 # Define settings dictionary #
107 # -------------------------- #
108 with open(settings_file) as f:
109 self.dict_settings = json.load(f)
111 # ------------------------------- #
112 # Initialize application settings #
113 # ------------------------------- #
114 if self.app_name not in self.dict_settings["apps"]:
115 self.dict_settings["apps"][self.app_name] = {
116 "working_dir": None,
117 }
119 # ----------------------------- #
120 # Set default working directory #
121 # ----------------------------- #
122 if self.dict_settings["default_working_dir"] is None:
123 for _, value in self.dict_settings["apps"].items():
124 if value["working_dir"] is not None:
125 self.dict_settings["default_working_dir"] = value["working_dir"]
126 break
128 # ------------------- #
129 # Write settings file #
130 # ------------------- #
131 with open(settings_file, "w") as f:
132 json.dump(self.dict_settings, f, indent=4)
134 # ------------------------ #
135 # Define list of processes #
136 # ------------------------ #
137 for proc in self.list_workflow:
138 self.list_processes.append(proc["process"].__name__)
140 def print_logo(self) -> None:
142 ascii_logo_path: str = files("nuremics.resources").joinpath("logo.txt")
143 f = open(ascii_logo_path)
144 for line in f:
145 lines = f.readlines()
146 print()
147 for line in lines:
148 print(colored(line.rstrip(), "yellow"))
150 def print_application(self) -> None:
152 # Printing
153 print()
154 print(
155 colored("> APPLICATION <", "blue", attrs=["reverse"]),
156 )
157 print()
158 print(
159 colored("| Workflow |", "magenta"),
160 )
161 print(
162 colored(f"{self.app_name}_____", "blue"),
163 )
165 # Define number of spaces taken by the workflow print
166 nb_spaces_app = len(self.app_name) + 5
168 # Print diagram of processes and operations
169 error = False
170 for i, proc in enumerate(self.list_workflow):
172 proc_name = proc["process"].__name__
173 process = proc["process"]
174 this_process: Process = process()
176 # Define number of spaces taken by the application print
177 nb_spaces_proc = len(proc_name) + 10
179 # Get list of operations for current process
180 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__)
182 # Test if process call contains only call to operations
183 valid_call = only_function_calls(
184 method=this_process.__call__,
185 allowed_methods=self.operations_by_process[proc_name],
186 )
188 # Printing
189 if valid_call:
190 print(
191 colored(" " * nb_spaces_app + f"|_____{proc_name}_____", "blue"),
192 )
193 for op_name in self.operations_by_process[proc_name]:
195 if i < len(self.list_workflow) - 1:
196 text = " " * nb_spaces_app + "|" + " " * nb_spaces_proc + f"|_____{op_name}"
197 else:
198 text = " " * (nb_spaces_app + 1) + " " * nb_spaces_proc + f"|_____{op_name}"
200 # Printing
201 print(
202 colored(text, "blue"),
203 )
204 else:
205 print(
206 colored(" " * nb_spaces_app + f"|_____{proc_name}_____", "blue") +
207 colored("(X)", "red"),
208 )
209 error = True
211 if i < len(self.list_workflow) - 1:
212 print(
213 colored(" " * nb_spaces_app + "|", "blue"),
214 )
216 if error:
217 print()
218 print(colored("(X) Each process must only call its internal function(s):", "red"))
219 print()
220 print(colored(" def __call__(self):", "red"))
221 print(colored(" super().__call__()", "red"))
222 print()
223 print(colored(" self.operation1()", "red"))
224 print(colored(" self.operation2()", "red"))
225 print(colored(" self.operation3()", "red"))
226 print(colored(" ...", "red"))
227 sys.exit(1)
229 def set_working_directory(self) -> None:
231 # --------------------- #
232 # Set working directory #
233 # --------------------- #
234 settings_file = self.config_path / "settings.json"
235 if self.dict_settings["apps"][self.app_name]["working_dir"] is None:
236 print()
237 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red"))
238 print(colored(f"> {settings_file}", "red"))
239 sys.exit(1)
241 self.working_dir = Path(self.dict_settings["apps"][self.app_name]["working_dir"]) / self.app_name
243 # ------------------- #
244 # Write settings file #
245 # ------------------- #
246 with open(settings_file, "w") as f:
247 json.dump(self.dict_settings, f, indent=4)
249 # ------------------------ #
250 # Create working directory #
251 # ------------------------ #
252 self.working_dir.mkdir(
253 exist_ok=True,
254 parents=True,
255 )
257 # ----------------------- #
258 # Go to working directory #
259 # ----------------------- #
260 os.chdir(self.working_dir)
262 def get_inputs(self) -> None:
264 for proc in self.list_workflow:
266 process = proc["process"]
267 name = proc["process"].__name__
268 this_process: Process = process()
270 self.inputs_by_process[name] = extract_inputs_and_types(this_process)
271 self.analysis_by_process[name] = extract_analysis(this_process)
273 if "settings" in proc:
274 self.settings_by_process[name] = proc["settings"]
276 self.params_by_process[name] = {}
277 self.paths_by_process[name] = []
278 self.params_plug[name] = {}
279 self.paths_plug[name] = {}
280 self.analysis_plug[name] = {}
282 for key, value_type in self.inputs_by_process[name].items():
284 # Get the module and type name
285 module_name = value_type.__module__
286 type_name = value_type.__name__
288 if module_name == "builtins":
289 type = type_name
290 else:
291 type = f"{module_name}.{type_name}"
293 if key not in self.analysis_by_process[name]:
295 if issubclass(value_type, pathlib.Path):
296 self.paths_by_process[name].append(key)
297 if ("user_paths" in proc) and (key in proc["user_paths"]):
298 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"]
299 elif ("required_paths" in proc) and (key in proc["required_paths"]):
300 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"]
301 else:
302 self.paths_plug[name][key] = None
304 else:
305 self.params_by_process[name][key] = [value_type, type]
306 if ("user_params" in proc) and (key in proc["user_params"]):
307 self.params_plug[name][key] = [proc["user_params"][key], "user_params"]
308 elif ("hard_params" in proc) and (key in proc["hard_params"]):
309 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"]
310 else:
311 self.params_plug[name][key] = None
313 elif ("overall_analysis" in proc) and (key in proc["overall_analysis"]):
314 self.analysis_plug[name][key] = proc["overall_analysis"][key]
315 else:
316 self.analysis_plug[name][key] = None
318 def get_outputs(self) -> None:
320 for proc in self.list_workflow:
322 process = proc["process"]
323 name = proc["process"].__name__
324 this_process: Process = process()
326 self.outputs_by_process[name] = extract_outputs(this_process)
327 self.outputs_plug[name] = {}
329 for output in self.outputs_by_process[name]:
330 if ("output_paths" in proc) and (output in proc["output_paths"]):
331 self.outputs_plug[name][output] = proc["output_paths"][output]
332 else:
333 self.outputs_plug[name][output] = None
335 def init_config(self) -> None:
337 for _, process in enumerate(self.list_workflow):
339 name = process["process"].__name__
341 # Define list of user parameters
342 if "user_params" in process:
343 for key, value in process["user_params"].items():
344 if key in self.params_by_process[name]:
345 self.user_params.append(value)
346 else:
347 print()
348 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red"))
349 sys.exit(1)
351 # Check on hard parameters
352 if "hard_params" in process:
353 for key, _ in process["hard_params"].items():
354 if key not in self.params_by_process[name]:
355 print()
356 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red"))
357 sys.exit(1)
359 # Define list of user paths
360 if "user_paths" in process:
361 for key, value in process["user_paths"].items():
362 if key in self.paths_by_process[name]:
363 self.user_paths.append(value)
364 else:
365 print()
366 print(colored(f"(X) {key} is not an input path of {name}.", "red"))
367 sys.exit(1)
369 # Check on required paths
370 if "required_paths" in process:
371 for _, value in process["required_paths"].items():
372 if value not in self.output_paths:
373 print()
374 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red"))
375 sys.exit(1)
377 # Define list of output paths
378 if "output_paths" in process:
379 for key, value in process["output_paths"].items():
380 if key in self.outputs_by_process[name]:
381 if value in self.output_paths:
382 print()
383 print(colored(f'(X) {value} is defined twice in "output_paths".', "red"))
384 sys.exit(1)
385 else:
386 self.output_paths.append(value)
387 else:
388 print()
389 print(colored(f"(X) {key} is not an output path of {name}.", "red"))
390 sys.exit(1)
392 # Define list of outputs for analysis
393 if "overall_analysis" in process:
394 for key, value in process["overall_analysis"].items():
395 if key in self.analysis_by_process[name]:
396 self.overall_analysis.append(value)
397 else:
398 print()
399 print(colored(f"(X) {key} is not an output analysis of {name}.", "red"))
400 sys.exit(1)
402 if value not in self.output_paths:
403 print()
404 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red"))
405 sys.exit(1)
407 # Delete duplicates
408 self.user_params = list(dict.fromkeys(self.user_params))
409 self.user_paths = list(dict.fromkeys(self.user_paths))
410 self.overall_analysis = list(dict.fromkeys(self.overall_analysis))
412 def print_processes(self) -> None:
414 for proc in self.list_workflow:
416 name = proc["process"].__name__
418 # Printing
419 print()
420 print(
421 colored(f"| {name} |", "magenta"),
422 )
424 # ---------------- #
425 # Input parameters #
426 # ---------------- #
427 print(
428 colored("> Input Parameter(s) :", "blue"),
429 )
430 if len(self.params_by_process[name]) == 0:
431 print(
432 colored("None.", "blue"),
433 )
434 else:
435 lines_proc = []
436 lines_user = []
437 error = False
438 for key, value in self.params_by_process[name].items():
440 # Process
441 text_type_proc = f"({value[1]})"
442 text_variable_proc = key
443 lines_proc.append((text_type_proc, text_variable_proc))
445 # User
446 if self.params_plug[name][key] is not None:
447 text_variable_user = str(self.params_plug[name][key][0])
448 text_definition_user = f"({self.params_plug[name][key][1]})"
449 lines_user.append((text_variable_user, text_definition_user))
450 else:
451 lines_user.append(("Not defined", "(X)"))
452 error = True
454 type_proc_width = max(len(t) for t, _ in lines_proc) + 1
455 variable_proc_width = max(len(p) for _, p in lines_proc) + 1
456 variable_user_width = max(len(t) for t, _ in lines_user) + 1
457 definition_user_width = max(len(p) for _, p in lines_user) + 1
459 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user):
460 proc_str = type_proc.ljust(type_proc_width) + var_proc.ljust(variable_proc_width) + "-----|"
461 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width)
462 if "(X)" in user_str:
463 color = "red"
464 else:
465 color = "green"
466 print(colored(proc_str, "blue") + colored(user_str, color))
468 if error:
469 print()
470 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red"))
471 sys.exit(1)
473 # ----------- #
474 # Input paths #
475 # ----------- #
476 print(
477 colored("> Input Path(s) :", "blue"),
478 )
479 if len(self.paths_by_process[name]) == 0:
480 print(
481 colored("None.", "blue"),
482 )
483 else:
484 lines_proc = []
485 lines_user = []
486 error = False
487 for path in self.paths_by_process[name]:
489 # Process
490 lines_proc.append(path)
492 # User
493 if self.paths_plug[name][path] is not None:
494 text_variable_user = self.paths_plug[name][path][0]
495 text_definition_user = f"({self.paths_plug[name][path][1]})"
496 lines_user.append((text_variable_user, text_definition_user))
497 else:
498 lines_user.append(("Not defined", "(X)"))
499 error = True
501 proc_width = max(len(t) for t in lines_proc) + 1
502 variable_user_width = max(len(t) for t, _ in lines_user) + 1
503 definition_user_width = max(len(p) for _, p in lines_user) + 1
505 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
506 proc_str = proc.ljust(proc_width) + "-----|"
507 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width)
508 if "(X)" in user_str:
509 color = "red"
510 else:
511 color = "green"
512 print(colored(proc_str, "blue") + colored(user_str, color))
514 if error:
515 print()
516 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red"))
517 sys.exit(1)
519 # ---------------- #
520 # Input analysis #
521 # ---------------- #
522 print(
523 colored("> Input Analysis :", "blue"),
524 )
525 if len(self.analysis_by_process[name]) == 0:
526 print(
527 colored("None.", "blue"),
528 )
529 else:
530 lines_proc = []
531 lines_user = []
532 error = False
533 for out in self.analysis_by_process[name]:
535 # Process
536 lines_proc.append(out)
538 # User
539 if self.analysis_plug[name][out] is not None:
540 text_variable_user = self.analysis_plug[name][out]
541 text_definition_user = "(overall_analysis)"
542 lines_user.append((text_variable_user, text_definition_user))
543 else:
544 lines_user.append(("Not defined", "(X)"))
545 error = True
547 proc_width = max(len(t) for t in lines_proc) + 1
548 variable_user_width = max(len(t) for t, _ in lines_user) + 1
549 definition_user_width = max(len(p) for _, p in lines_user) + 1
551 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
552 proc_str = proc.ljust(proc_width) + "-----|"
553 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width)
554 if "(X)" in user_str:
555 color = "red"
556 else:
557 color = "green"
558 print(colored(proc_str, "blue") + colored(user_str, color))
560 if error:
561 print()
562 print(colored('(X) Please define all output analysis in "overall_analysis".', "red"))
563 sys.exit(1)
565 # ------------ #
566 # Output paths #
567 # ------------ #
568 print(
569 colored("> Output Path(s) :", "blue"),
570 )
571 if len(self.outputs_by_process[name]) == 0:
572 print(
573 colored("None.", "blue"),
574 )
575 else:
576 lines_proc = []
577 lines_user = []
578 error = False
579 for path in self.outputs_by_process[name]:
581 # Process
582 lines_proc.append(path)
584 # User
585 if self.outputs_plug[name][path] is not None:
586 text_variable_user = self.outputs_plug[name][path]
587 text_definition_user = "(output_paths)"
588 lines_user.append((text_variable_user, text_definition_user))
589 else:
590 lines_user.append(("Not defined", "(X)"))
591 error = True
593 proc_width = max(len(t) for t in lines_proc) + 1
594 variable_user_width = max(len(t) for t, _ in lines_user) + 1
595 definition_user_width = max(len(p) for _, p in lines_user) + 1
597 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
598 proc_str = proc.ljust(proc_width) + "-----|"
599 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width)
600 if "(X)" in user_str:
601 color = "red"
602 else:
603 color = "green"
604 print(colored(proc_str, "blue") + colored(user_str, color))
606 if error:
607 print()
608 print(colored('(X) Please define all output paths in "output_paths".', "red"))
609 sys.exit(1)
611 def set_user_params_types(self) -> None:
613 # Gather all types of parameters
614 for proc, params in self.params_by_process.items():
615 for param, type in params.items():
616 user_param = self.params_plug[proc][param][0]
617 if user_param in self.user_params:
618 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]):
619 print()
620 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red"))
621 print(colored('> Please consider defining a new user parameter in "user_params".', "red"))
622 sys.exit(1)
623 self.params_type[user_param] = type
625 def print_io(self) -> None:
627 # Printing
628 print()
629 print(
630 colored("> INPUTS <", "blue", attrs=["reverse"]),
631 )
633 # Print input parameters
634 print()
635 print(
636 colored("| User Parameters |", "magenta"),
637 )
638 for param, type in self.params_type.items():
639 print(
640 colored(f"> {param} ({type[1]})", "blue"),
641 )
642 if len(list(self.params_type.items())) == 0:
643 print(
644 colored("None.", "blue"),
645 )
647 # Print input paths
648 print()
649 print(
650 colored("| User Paths |", "magenta"),
651 )
652 for path in self.user_paths:
653 print(
654 colored(f"> {path}", "blue"),
655 )
656 if len(self.user_paths) == 0:
657 print(
658 colored("None.", "blue"),
659 )
661 # Printing
662 print()
663 print(
664 colored("> OUTPUTS <", "blue", attrs=["reverse"]),
665 )
666 print()
667 for path in self.output_paths:
668 print(
669 colored(f"> {path}", "blue"),
670 )
671 if len(self.output_paths) == 0:
672 print(
673 colored("None.", "blue"),
674 )
676 def define_studies(self) -> None:
678 # ---------------------------- #
679 # Initialize studies json file #
680 # ---------------------------- #
681 self.studies_file = self.working_dir / "studies.json"
682 if self.studies_file.exists():
683 with open(self.studies_file) as f:
684 self.dict_studies = json.load(f)
685 else:
686 self.dict_studies["studies"] = []
687 self.dict_studies["config"] = {}
688 with open(self.studies_file, "w") as f:
689 json.dump(self.dict_studies, f, indent=4)
691 print()
692 print(
693 colored("> STUDIES <", "blue", attrs=["reverse"]),
694 )
696 if len(self.dict_studies["studies"]) == 0:
697 print()
698 print(colored("(X) Please declare at least one study in file :", "red"))
699 print(colored(f"> {self.studies_file}", "red"))
700 sys.exit(1)
701 else:
702 self.studies = self.dict_studies["studies"]
704 def init_studies(self) -> None:
706 # Clean studies
707 for study in list(self.dict_studies["config"].keys()):
708 if study not in self.studies:
709 del self.dict_studies["config"][study]
711 # Clean input parameters
712 for study in list(self.dict_studies["config"].keys()):
713 for param in list(self.dict_studies["config"][study]["user_params"]):
714 if param not in self.user_params:
715 del self.dict_studies["config"][study]["user_params"][param]
717 # Clean input paths
718 for study in list(self.dict_studies["config"].keys()):
719 for path in list(self.dict_studies["config"][study]["user_paths"]):
720 if path not in self.user_paths:
721 del self.dict_studies["config"][study]["user_paths"][path]
723 # Clean output paths
724 for study in list(self.dict_studies["config"].keys()):
725 for path in list(self.dict_studies["config"][study]["clean_outputs"]):
726 if path not in self.output_paths:
727 del self.dict_studies["config"][study]["clean_outputs"][path]
729 # Initialize input parameters/paths
730 for study in self.studies:
732 if study not in self.dict_studies["config"]:
733 self.dict_studies["config"][study] = {
734 "execute": True,
735 "user_params": {},
736 "user_paths": {},
737 "clean_outputs": {},
738 }
740 for param in self.user_params:
741 if param not in self.dict_studies["config"][study]["user_params"]:
742 if study == "Default":
743 self.dict_studies["config"][study]["user_params"][param] = False
744 else:
745 self.dict_studies["config"][study]["user_params"][param] = None
747 for file in self.user_paths:
748 if file not in self.dict_studies["config"][study]["user_paths"]:
749 if study == "Default":
750 self.dict_studies["config"][study]["user_paths"][file] = False
751 else:
752 self.dict_studies["config"][study]["user_paths"][file] = None
754 for path in self.output_paths:
755 if path not in self.dict_studies["config"][study]["clean_outputs"]:
756 self.dict_studies["config"][study]["clean_outputs"][path] = False
758 # Reordering
759 self.dict_studies["config"][study]["user_params"] = {k: self.dict_studies["config"][study]["user_params"][k] for k in self.user_params}
760 self.dict_studies["config"][study]["user_paths"] = {k: self.dict_studies["config"][study]["user_paths"][k] for k in self.user_paths}
762 # Write studies json file
763 with open(self.studies_file, "w") as f:
764 json.dump(self.dict_studies, f, indent=4)
766 def test_studies_modification(self) -> None:
768 # Loop over studies
769 for study in self.studies:
771 self.studies_modif[study] = False
773 study_file = Path(study) / ".study.json"
774 if study_file.exists():
775 with open(study_file) as f:
776 dict_study = json.load(f)
777 if (self.dict_studies["config"][study]["user_params"] != dict_study["user_params"]) or \
778 (self.dict_studies["config"][study]["user_paths"] != dict_study["user_paths"]):
779 self.studies_modif[study] = True
781 def test_studies_settings(self) -> None:
783 # Loop over studies
784 for study in self.studies:
786 self.studies_messages[study] = []
787 self.studies_config[study] = True
789 for param in self.user_params:
790 if self.dict_studies["config"][study]["user_params"][param] is None:
791 self.studies_messages[study].append(f"(X) {param} not configured.")
792 self.studies_config[study] = False
793 else:
794 if self.dict_studies["config"][study]["user_params"][param]:
795 text = "variable"
796 else:
797 text = "fixed"
798 self.studies_messages[study].append(f"(V) {param} is {text}.")
800 for file in self.user_paths:
801 if self.dict_studies["config"][study]["user_paths"][file] is None:
802 self.studies_messages[study].append(f"(X) {file} not configured.")
803 self.studies_config[study] = False
804 else:
805 if self.dict_studies["config"][study]["user_paths"][file]:
806 text = "variable"
807 else:
808 text = "fixed"
809 self.studies_messages[study].append(f"(V) {file} is {text}.")
811 def print_studies(self) -> None:
813 for study in self.studies:
815 # Printing
816 print()
817 print(
818 colored(f"| {study} |", "magenta"),
819 )
820 if self.studies_modif[study]:
821 print(
822 colored("(!) Configuration has been modified.", "yellow"),
823 )
824 self.clean_output_tree(study)
826 # Delete analysis file
827 path = Path(study) / "analysis.json"
828 if path.exists():
829 path.unlink()
831 for message in self.studies_messages[study]:
832 if "(V)" in message:
833 print(colored(message, "green"))
834 elif "(X)" in message:
835 print(colored(message, "red"))
837 if not self.studies_config[study]:
838 print()
839 print(colored("(X) Please configure file :", "red"))
840 print(colored(f"> {Path.cwd() / 'studies.json'}", "red"))
841 sys.exit(1)
843 def init_process_settings(self) -> None:
845 # Loop over studies
846 for study in self.studies:
848 # Open process json file if existing
849 process_file = Path(study) / "process.json"
850 if os.path.exists(process_file):
851 with open(process_file) as f:
852 self.dict_process[study] = json.load(f)
853 else:
854 self.dict_process[study] = {}
856 # Clean processes
857 for process in list(self.dict_process[study].keys()):
858 if process not in self.list_processes:
859 del self.dict_process[study][process]
861 # Loop over processes
862 for process in self.list_processes:
863 if process not in self.dict_process[study]:
864 self.dict_process[study][process] = {
865 "execute": True,
866 "silent": self.silent,
867 }
869 # Reordering
870 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes}
872 # Write studies json file
873 with open(process_file, "w") as f:
874 json.dump(self.dict_process[study], f, indent=4)
876 def configure_inputs(self) -> None:
878 for study in self.studies:
880 # Define list of fixed/variable parameters
881 fixed_params = []
882 variable_params = []
883 for key, value in self.dict_studies["config"][study]["user_params"].items():
884 if value is True:
885 variable_params.append(key)
886 else:
887 fixed_params.append(key)
889 # Define list of fixed/variable paths
890 fixed_paths = []
891 variable_paths = []
892 for key, value in self.dict_studies["config"][study]["user_paths"].items():
893 if value is True:
894 variable_paths.append(key)
895 else:
896 fixed_paths.append(key)
898 self.fixed_params[study] = fixed_params
899 self.variable_params[study] = variable_params
900 self.fixed_paths[study] = fixed_paths
901 self.variable_paths[study] = variable_paths
903 def init_data_tree(self) -> None:
905 # Loop over studies
906 for study in self.studies:
908 # Initialize study directory
909 study_dir: Path = self.working_dir / study
910 study_dir.mkdir(
911 exist_ok=True,
912 parents=True,
913 )
915 # Write study json file
916 with open(study_dir / ".study.json", "w") as f:
917 json.dump(self.dict_studies["config"][study], f, indent=4)
919 # Initialize inputs csv
920 inputs_file: Path = study_dir / "inputs.csv"
921 if (len(self.variable_params[study]) > 0) or \
922 (len(self.variable_paths[study]) > 0):
924 if not inputs_file.exists():
926 # Create empty input dataframe
927 df_inputs = pd.DataFrame(columns=["ID"] + self.variable_params[study] + ["EXECUTE"])
929 # Write input dataframe
930 df_inputs.to_csv(
931 path_or_buf=inputs_file,
932 index=False,
933 )
935 else:
937 # Read input dataframe
938 df_inputs = pd.read_csv(
939 filepath_or_buffer=inputs_file,
940 index_col=0,
941 )
943 # Update variable parameters
944 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns})
945 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]]
947 # Set default execution
948 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int)
950 # Write input dataframe
951 df_inputs.to_csv(
952 path_or_buf=inputs_file,
953 )
955 # Define list of datasets
956 self.dict_datasets[study] = df_inputs.index.tolist()
958 # Delete file
959 elif inputs_file.exists():
960 inputs_file.unlink()
962 # Initialize inputs json file
963 inputs_file: Path = study_dir / "inputs.json"
964 if (len(self.fixed_params[study]) > 0) or \
965 (len(self.fixed_paths[study]) > 0) or \
966 (len(self.variable_paths[study]) > 0):
968 # Create file
969 if not inputs_file.exists():
971 # Initialize dictionary
972 dict_inputs = {}
973 if len(self.fixed_params[study]) > 0:
974 for param in self.fixed_params[study]:
975 dict_inputs[param] = None
976 if len(self.fixed_paths[study]) > 0:
977 for path in self.fixed_paths[study]:
978 dict_inputs[path] = None
979 if len(self.variable_paths[study]) > 0:
980 for path in self.variable_paths[study]:
981 dict_inputs[path] = {}
982 for index in df_inputs.index:
983 dict_inputs[path][index] = None
985 # Write json
986 with open(inputs_file, "w") as f:
987 json.dump(dict_inputs, f, indent=4)
989 # Update file
990 else:
992 # Read inputs json
993 with open(inputs_file) as f:
994 dict_inputs = json.load(f)
996 # Update fixed parameters
997 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]}
999 # Update fixed paths
1000 dict_fixed_paths = {}
1001 for path in self.fixed_paths[study]:
1002 value = dict_inputs.get(path, None)
1003 if isinstance(value, dict):
1004 dict_fixed_paths[path] = None
1005 else:
1006 dict_fixed_paths[path] = value
1008 # Update variable paths
1009 dict_variable_paths = {}
1010 for path in self.variable_paths[study]:
1011 existing_values = dict_inputs.get(path, {})
1012 if not isinstance(existing_values, dict):
1013 existing_values = {}
1014 dict_variable_paths[path] = {
1015 idx: existing_values.get(idx)
1016 for idx in df_inputs.index
1017 }
1019 # Update inputs dictionnary
1020 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths}
1022 # Write inputs json
1023 with open(inputs_file, "w") as f:
1024 json.dump(dict_inputs, f, indent=4)
1026 self.dict_inputs[study] = dict_inputs
1028 else:
1030 # Delete file
1031 if inputs_file.exists():
1032 inputs_file.unlink()
1034 self.dict_inputs[study] = {}
1036 # Initialize inputs directory
1037 inputs_dir: Path = study_dir / "0_inputs"
1038 if len(self.user_paths) > 0:
1040 # Create inputs directory (if necessary)
1041 inputs_dir.mkdir(
1042 exist_ok=True,
1043 parents=True,
1044 )
1046 # Delete fixed paths (if necessary)
1047 input_paths = [f for f in inputs_dir.iterdir()]
1048 for path in input_paths:
1049 resolved_path = path.resolve().name
1050 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"):
1051 if Path(path).is_file():
1052 path.unlink()
1053 else:
1054 shutil.rmtree(path)
1056 # Update inputs subfolders for variable paths
1057 datasets_dir: Path = inputs_dir / "0_datasets"
1058 if len(self.variable_paths[study]) > 0:
1060 # Create datasets directory (if necessary)
1061 datasets_dir.mkdir(
1062 exist_ok=True,
1063 parents=True,
1064 )
1066 # Create subfolders (if necessary)
1067 for index in df_inputs.index:
1069 inputs_subfolder: Path = datasets_dir / index
1070 inputs_subfolder.mkdir(
1071 exist_ok=True,
1072 parents=True,
1073 )
1075 # Delete variable paths (if necessary)
1076 input_paths = [f for f in inputs_subfolder.iterdir()]
1077 for path in input_paths:
1078 resolved_path = path.resolve().name
1079 if resolved_path not in self.variable_paths[study]:
1080 if Path(path).is_file():
1081 path.unlink()
1082 else:
1083 shutil.rmtree(path)
1085 # Delete subfolders (if necessary)
1086 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()]
1087 for folder in inputs_subfolders:
1088 id = os.path.split(folder)[-1]
1089 if id not in self.dict_datasets[study]:
1090 shutil.rmtree(folder)
1092 # Delete datasets folder (if necessary)
1093 elif datasets_dir.exists():
1094 shutil.rmtree(datasets_dir)
1096 # Delete inputs directory (if necessary)
1097 elif inputs_dir.exists():
1098 shutil.rmtree(inputs_dir)
1100 # Delete useless study directories
1101 studies_folders = [f for f in self.working_dir.iterdir() if f.is_dir()]
1102 for folder in studies_folders:
1103 if os.path.split(folder)[-1] not in self.studies:
1104 shutil.rmtree(folder)
1106 def clean_output_tree(self,
1107 study: str,
1108 ) -> None:
1110 # Initialize study directory
1111 study_dir: Path = self.working_dir / study
1113 # Outputs data
1114 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()]
1115 for folder in outputs_folders:
1116 if os.path.split(folder)[-1] != "0_inputs":
1117 shutil.rmtree(folder)
1119 # Paths file
1120 paths_file = study_dir / ".paths.json"
1121 if paths_file.exists():
1122 paths_file.unlink()
1124 def set_inputs(self) -> None:
1126 # Loop over studies
1127 for study in self.studies:
1129 # Define study directory
1130 study_dir: Path = self.working_dir / study
1132 # Go to study directory
1133 os.chdir(study_dir)
1135 # Initialize dictionary of input paths
1136 self.dict_user_paths[study] = {}
1138 # Fixed parameters
1139 if len(self.fixed_params[study]) > 0:
1140 data = self.dict_inputs[study]
1141 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data}
1142 else:
1143 self.dict_fixed_params[study] = {}
1145 # Variable parameters
1146 if (len(self.variable_params[study]) > 0) or \
1147 (len(self.variable_paths[study]) > 0):
1149 # Read input dataframe
1150 self.dict_variable_params[study] = pd.read_csv(
1151 filepath_or_buffer="inputs.csv",
1152 index_col=0,
1153 )
1155 else:
1156 self.dict_variable_params[study] = pd.DataFrame()
1158 # Fixed paths
1159 dict_input_paths = {}
1160 for file in self.fixed_paths[study]:
1161 if self.dict_inputs[study][file] is not None:
1162 dict_input_paths[file] = self.dict_inputs[study][file]
1163 else:
1164 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file)
1166 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1168 # Variable paths
1169 if len(self.variable_paths[study]) > 0:
1171 dict_input_paths = {}
1172 df_inputs = pd.read_csv(
1173 filepath_or_buffer="inputs.csv",
1174 index_col=0,
1175 )
1176 for file in self.variable_paths[study]:
1177 dict_input_paths[file] = {}
1178 for idx in df_inputs.index:
1179 if self.dict_inputs[study][file][idx] is not None:
1180 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx]
1181 else:
1182 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file)
1184 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1186 # Go back to working directory
1187 os.chdir(self.working_dir)
1189 def test_inputs_settings(self) -> None:
1191 # Loop over studies
1192 for study in self.studies:
1194 # Define study directory
1195 study_dir: Path = self.working_dir / study
1197 # Go to study directory
1198 os.chdir(study_dir)
1200 self.fixed_params_messages[study] = []
1201 self.fixed_paths_messages[study] = []
1202 self.fixed_params_config[study] = True
1203 self.fixed_paths_config[study] = True
1204 self.variable_params_messages[study] = {}
1205 self.variable_paths_messages[study] = {}
1206 self.variable_params_config[study] = {}
1207 self.variable_paths_config[study] = {}
1209 # Fixed parameters
1210 for param, value in self.dict_fixed_params[study].items():
1211 if value is None:
1212 self.fixed_params_messages[study].append(f"(X) {param}")
1213 self.fixed_params_config[study] = False
1214 elif not isinstance(value, self.params_type[param][0]):
1215 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1216 else:
1217 self.fixed_params_messages[study].append(f"(V) {param}")
1219 # Fixed paths
1220 for file in self.fixed_paths[study]:
1221 file_path: Path = Path(self.dict_user_paths[study][file])
1222 if not file_path.exists():
1223 self.fixed_paths_messages[study].append(f"(X) {file}")
1224 self.fixed_paths_config[study] = False
1225 else:
1226 self.fixed_paths_messages[study].append(f"(V) {file}")
1228 # Variable inputs
1229 if (len(self.variable_params[study]) > 0) or \
1230 (len(self.variable_paths[study]) > 0):
1232 for index in self.dict_variable_params[study].index:
1234 self.variable_params_messages[study][index] = []
1235 self.variable_paths_messages[study][index] = []
1236 self.variable_params_config[study][index] = True
1237 self.variable_paths_config[study][index] = True
1239 # Variable parameters
1240 for param in self.variable_params[study]:
1241 value = self.dict_variable_params[study].at[index, param]
1242 if pd.isna(value) or value == "":
1243 self.variable_params_messages[study][index].append(f"(X) {param}")
1244 self.variable_params_config[study][index] = False
1245 else:
1246 if isinstance(value, (np.integer, np.floating, np.bool_)):
1247 value = value.item()
1248 if not isinstance(value, self.params_type[param][0]):
1249 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1250 else:
1251 self.variable_params_messages[study][index].append(f"(V) {param}")
1253 # Variable paths
1254 for file in self.variable_paths[study]:
1255 file_path: Path = Path(self.dict_user_paths[study][file][index])
1256 if not file_path.exists():
1257 self.variable_paths_messages[study][index].append(f"(X) {file}")
1258 self.variable_paths_config[study][index] = False
1259 else:
1260 self.variable_paths_messages[study][index].append(f"(V) {file}")
1262 # Go back to working directory
1263 os.chdir(self.working_dir)
1265 def print_inputs_settings(self) -> None:
1267 print()
1268 print(
1269 colored("> SETTINGS <", "blue", attrs=["reverse"]),
1270 )
1271 for study in self.studies:
1273 # Define study directory
1274 study_dir: Path = self.working_dir / study
1276 # Go to study directory
1277 os.chdir(study_dir)
1279 # Printing
1280 print()
1281 print(colored(f"| {study} |", "magenta"))
1283 # ------------ #
1284 # Fixed inputs #
1285 # ------------ #
1286 list_text = [colored("> Common :", "blue")]
1287 list_errors = []
1288 config = True
1289 type_error = False
1291 # Fixed parameters
1292 for message in self.fixed_params_messages[study]:
1293 if "(V)" in message:
1294 list_text.append(colored(message, "green"))
1295 elif "(X)" in message:
1296 list_text.append(colored(message, "red"))
1297 if config:
1298 list_errors.append(colored(f"> {Path.cwd() / 'inputs.json'}", "red"))
1299 config = False
1300 elif "(!)" in message:
1301 list_text.append(colored(message, "yellow"))
1302 type_error = True
1304 # Fixed paths
1305 for i, message in enumerate(self.fixed_paths_messages[study]):
1306 if "(V)" in message:
1307 list_text.append(colored(message, "green"))
1308 elif "(X)" in message:
1309 file = self.fixed_paths[study][i]
1310 path = self.dict_user_paths[study][file]
1311 list_text.append(colored(message, "red"))
1312 list_errors.append(colored(f"> {path}", "red"))
1314 # Printing
1315 if len(list_text) == 1:
1316 print(colored("None.", "blue"))
1317 else:
1318 print(*list_text)
1320 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]:
1321 print()
1322 print(colored("(X) Please set inputs :", "red"))
1323 for error in list_errors:
1324 print(error)
1325 sys.exit(1)
1327 if type_error:
1328 print()
1329 print(colored("(X) Please set parameter(s) with expected type(s) in file :", "red"))
1330 print(colored(f"> {Path.cwd() / 'inputs.json'}", "red"))
1331 sys.exit(1)
1333 # --------------- #
1334 # Variable inputs #
1335 # --------------- #
1336 list_errors = []
1337 config = True
1338 type_error = False
1340 if (len(self.variable_params[study]) > 0) or \
1341 (len(self.variable_paths[study]) > 0):
1343 # Check if datasets have been defined
1344 if len(self.dict_variable_params[study].index) == 0:
1345 print()
1346 print(colored("(X) Please declare at least one experiment in file :", "red"))
1347 print(colored(f"> {Path.cwd() / 'inputs.csv'}", "red"))
1348 sys.exit(1)
1350 for index in self.dict_variable_params[study].index:
1352 list_text = [colored(f"> {index} :", "blue")]
1354 # Variable parameters
1355 for message in self.variable_params_messages[study][index]:
1356 if "(V)" in message:
1357 list_text.append(colored(message, "green"))
1358 elif "(X)" in message:
1359 list_text.append(colored(message, "red"))
1360 if config:
1361 list_errors.append(colored(f"> {Path.cwd() / 'inputs.csv'}", "red"))
1362 config = False
1363 elif "(!)" in message:
1364 list_text.append(colored(message, "yellow"))
1365 type_error = True
1367 # Variable paths
1368 for i, message in enumerate(self.variable_paths_messages[study][index]):
1369 if "(V)" in message:
1370 list_text.append(colored(message, "green"))
1371 elif "(X)" in message:
1372 file = self.variable_paths[study][i]
1373 path = self.dict_user_paths[study][file][index]
1374 list_text.append(colored(message, "red"))
1375 list_errors.append(colored(f"> {path}", "red"))
1377 # Printing
1378 print(*list_text)
1380 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1)
1381 if len(list_errors) > 0:
1382 print()
1383 print(colored("(X) Please set inputs :", "red"))
1384 for error in list_errors:
1385 print(error)
1386 sys.exit(1)
1388 if type_error:
1389 print()
1390 print(colored("(X) Please set parameter(s) with expected type(s) in file :", "red"))
1391 print(colored(f"> {Path.cwd() / 'inputs.csv'}", "red"))
1392 sys.exit(1)
1394 # Go back to working directory
1395 os.chdir(self.working_dir)
1397 def init_paths(self) -> None:
1399 # Loop over studies
1400 for study in self.studies:
1402 # Define study directory
1403 study_dir: Path = self.working_dir / study
1405 file_output_paths = study_dir / ".paths.json"
1406 if file_output_paths.exists():
1407 with open(file_output_paths) as f:
1408 dict_paths = json.load(f)
1409 else:
1410 dict_paths = {}
1411 for path in self.output_paths:
1412 dict_paths[path] = None
1414 # Purge old datasets
1415 for key, value in dict_paths.items():
1416 if isinstance(value, dict):
1417 # List of datasets to delete
1418 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]]
1419 for dataset in to_delete:
1420 del dict_paths[key][dataset]
1422 self.dict_paths[study] = dict_paths
1424 def update_analysis(self) -> None:
1426 # Loop over studies
1427 for study in self.studies:
1429 # Define study directory
1430 study_dir: Path = self.working_dir / study
1432 # Define analysis file
1433 analysis_file = study_dir / "analysis.json"
1435 # Initialize analysis file
1436 if os.path.exists(analysis_file):
1437 with open(analysis_file) as f:
1438 self.dict_analysis[study] = json.load(f)
1439 else:
1440 self.dict_analysis[study] = {}
1442 # Browse all datasets
1443 for proc, settings in self.settings_by_process.items():
1445 # Initialize proc key
1446 if proc not in self.dict_analysis[study]:
1447 self.dict_analysis[study][proc] = {}
1449 # Add missing datasets
1450 for dataset in self.dict_datasets[study]:
1451 if dataset not in self.dict_analysis[study][proc]:
1452 self.dict_analysis[study][proc][dataset] = settings
1454 # Delete useless datasets
1455 datasets_to_delete = []
1456 for dataset in self.dict_analysis[study][proc]:
1457 if dataset not in self.dict_datasets[study]:
1458 datasets_to_delete.append(dataset)
1460 for dataset in datasets_to_delete:
1461 if dataset in self.dict_analysis[study][proc]:
1462 del self.dict_analysis[study][proc][dataset]
1464 with open(analysis_file, "w") as f:
1465 json.dump(self.dict_analysis[study], f, indent=4)
1467 def clean_outputs(self) -> None:
1469 # Function to remove output path, either file or directory
1470 def _remove_output(output: str) -> None:
1471 output_path = Path(output)
1472 if output_path.exists():
1473 if output_path.is_dir():
1474 shutil.rmtree(output)
1475 else:
1476 output_path.unlink()
1478 # Loop over studies
1479 for study, study_dict in self.dict_studies["config"].items():
1481 # Delete specified outputs
1482 for key, value in study_dict["clean_outputs"].items():
1483 if value:
1484 if isinstance(self.dict_paths[study][key], str):
1485 _remove_output(self.dict_paths[study][key])
1486 if isinstance(self.dict_paths[study][key], dict):
1487 for _, value in self.dict_paths[study][key].items():
1488 _remove_output(value)
1490 def purge_output_datasets(self,
1491 study: str,
1492 ) -> None:
1494 datasets_paths = [f for f in Path.cwd().iterdir()]
1495 for path in datasets_paths:
1496 resolved_path = path.resolve().name
1497 if resolved_path not in self.dict_datasets[study]:
1498 shutil.rmtree(path)
1500 def update_workflow_diagram(self,
1501 process: Process,
1502 ) -> None:
1504 self.diagram[process.name] = {
1505 "params": list(process.params.values()),
1506 "allparams": process.allparams,
1507 "paths": list(process.paths.values()),
1508 "allpaths": process.allpaths,
1509 "required_paths": list(process.required_paths.values()),
1510 "output_paths": list(process.output_paths.values()),
1511 }
1513 def __call__(self) -> None:
1515 # --------------- #
1516 # Launch workflow #
1517 # --------------- #
1518 print()
1519 print(
1520 colored("> RUNNING <", "blue", attrs=["reverse"]),
1521 )
1523 for study, dict_study in self.dict_studies["config"].items():
1525 # Check if study must be executed
1526 if not dict_study["execute"]:
1528 # Printing
1529 print()
1530 print(
1531 colored(f"| {study} |", "magenta"),
1532 )
1533 print()
1534 print(colored("(!) Study is skipped.", "yellow"))
1536 continue
1538 study_dir: Path = self.working_dir / study
1539 os.chdir(study_dir)
1541 for step, proc in enumerate(self.list_workflow):
1543 self.update_analysis()
1545 if "hard_params" in proc:
1546 dict_hard_params = proc["hard_params"]
1547 else:
1548 dict_hard_params = {}
1550 if "user_params" in proc:
1551 user_params = proc["user_params"]
1552 else:
1553 user_params = {}
1555 if "user_paths" in proc:
1556 user_paths = proc["user_paths"]
1557 else:
1558 user_paths = {}
1560 if "required_paths" in proc:
1561 required_paths = proc["required_paths"]
1562 else:
1563 required_paths = {}
1565 if "output_paths" in proc:
1566 output_paths = proc["output_paths"]
1567 else:
1568 output_paths = {}
1570 if "overall_analysis" in proc:
1571 overall_analysis = proc["overall_analysis"]
1572 else:
1573 overall_analysis = {}
1575 # Define class object for the current process
1576 process = proc["process"]
1577 this_process: Process = process(
1578 study=study,
1579 df_user_params=self.dict_variable_params[study],
1580 dict_user_params=self.dict_fixed_params[study],
1581 dict_user_paths=self.dict_user_paths[study],
1582 dict_paths=self.dict_paths[study],
1583 params=user_params,
1584 paths=user_paths,
1585 dict_hard_params=dict_hard_params,
1586 fixed_params=self.fixed_params[study],
1587 variable_params=self.variable_params[study],
1588 fixed_paths=self.fixed_paths[study],
1589 variable_paths=self.variable_paths[study],
1590 required_paths=required_paths,
1591 output_paths=output_paths,
1592 overall_analysis=overall_analysis,
1593 dict_analysis=self.dict_analysis[study],
1594 silent=self.dict_process[study][self.list_processes[step]]["silent"],
1595 diagram=self.diagram,
1596 )
1598 # Define process name
1599 this_process.name = this_process.__class__.__name__
1601 # Define working folder associated to the current process
1602 folder_name = f"{step + 1}_{this_process.name}"
1603 folder_path: Path = study_dir / folder_name
1604 folder_path.mkdir(exist_ok=True, parents=True)
1605 os.chdir(folder_path)
1607 # Initialize process
1608 this_process.initialize()
1610 # Check if process must be executed
1611 if not self.dict_process[study][self.list_processes[step]]["execute"]:
1613 # Printing
1614 print()
1615 print(
1616 colored(f"| {study} | {this_process.name} |", "magenta"),
1617 )
1618 print()
1619 print(colored("(!) Process is skipped.", "yellow"))
1621 # Update workflow diagram
1622 self.update_workflow_diagram(this_process)
1624 continue
1626 if this_process.is_case:
1628 # Define sub-folders associated to each ID of the inputs dataframe
1629 for idx in this_process.df_params.index:
1631 # Printing
1632 print()
1633 print(
1634 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"),
1635 )
1637 # Check if dataset must be executed
1638 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0:
1640 # Printing
1641 print()
1642 print(colored("(!) Experiment is skipped.", "yellow"))
1644 # Go back to working folder
1645 os.chdir(folder_path)
1647 # Purge old output datasets
1648 self.purge_output_datasets(study)
1650 # Update workflow diagram
1651 self.update_workflow_diagram(this_process)
1653 continue
1655 # Update process index
1656 this_process.index = idx
1658 subfolder_path = study_dir / folder_name / str(idx)
1659 subfolder_path.mkdir(exist_ok=True, parents=True)
1660 os.chdir(subfolder_path)
1662 # Launch process
1663 this_process()
1664 this_process.finalize()
1666 # Go back to working folder
1667 os.chdir(folder_path)
1669 # Purge old output datasets
1670 self.purge_output_datasets(study)
1672 else:
1674 # Printing
1675 print()
1676 print(
1677 colored(f"| {study} | {this_process.name} |", "magenta"),
1678 )
1680 # Launch process
1681 this_process()
1682 this_process.finalize()
1684 # Update workflow diagram
1685 self.update_workflow_diagram(this_process)
1687 # Update paths dictonary
1688 self.dict_paths[study] = this_process.dict_paths
1690 # Write paths json file
1691 with open(study_dir / ".paths.json", "w") as f:
1692 json.dump(self.dict_paths[study], f, indent=4)
1694 # Go back to study directory
1695 os.chdir(study_dir)
1697 # Write diagram json file
1698 with open(".diagram.json", "w") as f:
1699 json.dump(self.diagram, f, indent=4)
1701 # Go back to working directory
1702 os.chdir(self.working_dir)
1704 # Delete unecessary outputs
1705 self.clean_outputs()