Coverage for C:\Users\julie\micromamba\envs\nrs-env\Lib\site-packages\nuremics\core\workflow.py: 82%
938 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-30 11:13 +0200
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-30 11:13 +0200
1from __future__ import annotations
3import os
4import pathlib
5import sys
7import json
8import shutil
9import numpy as np
10import pandas as pd
11from pathlib import Path
12from termcolor import colored
14from .process import Process
15from .utils import (
16 get_self_method_calls,
17 only_function_calls,
18 extract_inputs_and_types,
19 extract_analysis,
20 extract_self_output_keys,
21)
22from importlib.resources import files
25class WorkFlow:
26 """Manage workflow of processes."""
28 def __init__(
29 self,
30 app_name: str,
31 nuremics_dir: str,
32 workflow: list,
33 silent: bool = False,
34 ):
35 """Initialization."""
37 # -------------------- #
38 # Initialize variables #
39 # -------------------- #
40 self.app_name = app_name
41 self.list_workflow = workflow
42 self.list_processes = []
43 self.dict_inputs = {}
44 self.dict_datasets = {}
45 self.dict_studies = {}
46 self.dict_process = {}
47 self.dict_analysis = {}
48 self.user_params = []
49 self.user_paths = []
50 self.output_paths = []
51 self.overall_analysis = []
52 self.analysis_settings = {}
53 self.params_type = {}
54 self.operations_by_process = {}
55 self.inputs_by_process = {}
56 self.params_by_process = {}
57 self.paths_by_process = {}
58 self.outputs_by_process = {}
59 self.analysis_by_process = {}
60 self.settings_by_process = {}
61 self.params_plug = {}
62 self.paths_plug = {}
63 self.outputs_plug = {}
64 self.analysis_plug = {}
65 self.studies_modif = {}
66 self.studies_messages = {}
67 self.studies_config = {}
68 self.fixed_params_messages = {}
69 self.fixed_params_config = {}
70 self.fixed_paths_messages = {}
71 self.fixed_paths_config = {}
72 self.variable_params_messages = {}
73 self.variable_params_config = {}
74 self.variable_paths_messages = {}
75 self.variable_paths_config = {}
76 self.fixed_params = {}
77 self.fixed_paths = {}
78 self.variable_params = {}
79 self.variable_paths = {}
80 self.dict_fixed_params = {}
81 self.dict_variable_params = {}
82 self.dict_user_paths = {}
83 self.dict_paths = {}
84 self.diagram = {}
85 self.silent = silent
87 # ------------------------------------ #
88 # Define and create nuremics directory #
89 # ------------------------------------ #
90 self.nuremics_dir = Path(nuremics_dir) / ".nuremics"
91 self.nuremics_dir.mkdir(
92 exist_ok=True,
93 parents=True,
94 )
96 # -------------------- #
97 # Create settings file #
98 # -------------------- #
99 settings_file = self.nuremics_dir / "settings.json"
100 if not settings_file.exists():
101 dict_settings = {
102 "default_working_dir": None,
103 "apps": {},
104 }
105 with open(settings_file, "w") as f:
106 json.dump(dict_settings, f, indent=4)
108 # -------------------------- #
109 # Define settings dictionary #
110 # -------------------------- #
111 with open(settings_file) as f:
112 self.dict_settings = json.load(f)
114 # ------------------------------- #
115 # Initialize application settings #
116 # ------------------------------- #
117 if self.app_name not in self.dict_settings["apps"]:
118 self.dict_settings["apps"][self.app_name] = {
119 "working_dir": None,
120 "studies": [],
121 }
123 # ----------------------------- #
124 # Set default working directory #
125 # ----------------------------- #
126 if self.dict_settings["default_working_dir"] is None:
127 for _, value in self.dict_settings["apps"].items():
128 if value["working_dir"] is not None:
129 self.dict_settings["default_working_dir"] = value["working_dir"]
130 break
132 # ------------------- #
133 # Write settings file #
134 # ------------------- #
135 with open(settings_file, "w") as f:
136 json.dump(self.dict_settings, f, indent=4)
138 # ------------------------ #
139 # Define list of processes #
140 # ------------------------ #
141 for proc in self.list_workflow:
142 self.list_processes.append(proc["process"].__name__)
144 def print_logo(self):
145 """Print ASCII NUREMICS logo"""
147 ascii_logo_path:str = files("nuremics.resources").joinpath("logo.txt")
148 f = open(ascii_logo_path, "r")
149 for line in f:
150 lines = f.readlines()
151 print()
152 for line in lines:
153 print(colored(line.rstrip(), "yellow"))
155 def print_application(self):
156 """Print application"""
158 # Printing
159 print()
160 print(
161 colored("> APPLICATION <", "blue", attrs=["reverse"]),
162 )
163 print()
164 print(
165 colored(f"| Workflow |", "magenta"),
166 )
167 print(
168 colored(f"{self.app_name}_____", "blue"),
169 )
171 # Define number of spaces taken by the workflow print
172 nb_spaces_app = len(self.app_name)+5
174 # Print diagram of processes and operations
175 error = False
176 for i, proc in enumerate(self.list_workflow):
178 proc_name = proc["process"].__name__
179 process = proc["process"]
180 this_process:Process = process()
182 # Define number of spaces taken by the application print
183 nb_spaces_proc = len(proc_name)+10
185 # Get list of operations for current process
186 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__)
188 # Test if process call contains only call to operations
189 valid_call = only_function_calls(
190 method=this_process.__call__,
191 allowed_methods=self.operations_by_process[proc_name]
192 )
194 # Printing
195 if valid_call:
196 print(
197 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue"),
198 )
199 for op_name in self.operations_by_process[proc_name]:
201 if i < len(self.list_workflow)-1:
202 text = " "*nb_spaces_app+"|"+" "*nb_spaces_proc+f"|_____{op_name}"
203 else:
204 text = " "*(nb_spaces_app+1)+" "*nb_spaces_proc+f"|_____{op_name}"
206 # Printing
207 print(
208 colored(text, "blue"),
209 )
210 else:
211 print(
212 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue") + \
213 colored("(X)", "red")
214 )
215 error = True
217 if i < len(self.list_workflow)-1:
218 print(
219 colored(" "*nb_spaces_app+"|", "blue"),
220 )
222 if error:
223 print()
224 print(colored(f"(X) Each process must only call its internal function(s):", "red"))
225 print()
226 print(colored(f" def __call__(self):", "red"))
227 print(colored(f" super().__call__()", "red"))
228 print()
229 print(colored(f" self.operation1()", "red"))
230 print(colored(f" self.operation2()", "red"))
231 print(colored(f" self.operation3()", "red"))
232 print(colored(f" ...", "red"))
233 sys.exit(1)
235 def set_working_directory(self):
236 """Set working directory"""
238 # --------------------- #
239 # Set working directory #
240 # --------------------- #
241 settings_file = self.nuremics_dir / "settings.json"
242 if self.dict_settings["apps"][self.app_name]["working_dir"] is None:
243 if self.dict_settings["default_working_dir"] is None:
244 print()
245 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red"))
246 print(colored(f"> {str(settings_file)}", "red"))
247 sys.exit(1)
248 else:
249 print()
250 print(colored(f'(!) Found "default_working_dir": {self.dict_settings["default_working_dir"]}', "yellow"))
251 while True:
252 answer = input(colored(f'Accept it as "working_dir" for {self.app_name}: [Y/n] ', "yellow")).strip().lower()
253 if answer in ["y", "yes", ""]:
254 self.dict_settings["apps"][self.app_name]["working_dir"] = self.dict_settings["default_working_dir"]
255 break
256 elif answer in ["n", "no"]:
257 print()
258 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red"))
259 print(colored(f"> {str(settings_file)}", "red"))
260 sys.exit(1)
262 self.working_dir = Path(self.dict_settings["apps"][self.app_name]["working_dir"]) / self.app_name
264 # ------------------- #
265 # Write settings file #
266 # ------------------- #
267 with open(settings_file, "w") as f:
268 json.dump(self.dict_settings, f, indent=4)
270 # ------------------------ #
271 # Create working directory #
272 # ------------------------ #
273 self.working_dir.mkdir(
274 exist_ok=True,
275 parents=True,
276 )
278 # ----------------------- #
279 # Go to working directory #
280 # ----------------------- #
281 os.chdir(self.working_dir)
283 def get_inputs(self):
284 """Get inputs"""
286 for proc in self.list_workflow:
288 process = proc["process"]
289 name = proc["process"].__name__
290 this_process:Process = process()
292 self.inputs_by_process[name] = extract_inputs_and_types(this_process)
293 self.analysis_by_process[name], self.settings_by_process[name] = extract_analysis(this_process)
295 self.params_by_process[name] = {}
296 self.paths_by_process[name] = []
297 self.params_plug[name] = {}
298 self.paths_plug[name] = {}
299 self.analysis_plug[name] = {}
301 for key, value_type in self.inputs_by_process[name].items():
303 # Get the module and type name
304 module_name = value_type.__module__
305 type_name = value_type.__name__
307 if module_name == "builtins":
308 type = type_name
309 else:
310 type = f"{module_name}.{type_name}"
312 if key not in self.analysis_by_process[name]:
314 if issubclass(value_type, pathlib.Path):
315 self.paths_by_process[name].append(key)
316 if ("user_paths" in proc) and (key in proc["user_paths"]):
317 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"]
318 elif ("required_paths" in proc) and (key in proc["required_paths"]):
319 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"]
320 else:
321 self.paths_plug[name][key] = None
323 else:
324 self.params_by_process[name][key] = [value_type, type]
325 if ("user_params" in proc) and (key in proc["user_params"]):
326 self.params_plug[name][key] = [proc["user_params"][key], "user_params"]
327 elif ("hard_params" in proc) and (key in proc["hard_params"]):
328 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"]
329 else:
330 self.params_plug[name][key] = None
332 else:
333 if ("overall_analysis" in proc) and (key in proc["overall_analysis"]):
334 self.analysis_plug[name][key] = proc["overall_analysis"][key]
335 else:
336 self.analysis_plug[name][key] = None
338 def get_outputs(self):
339 """Get outputs"""
341 for proc in self.list_workflow:
343 process = proc["process"]
344 name = proc["process"].__name__
345 this_process:Process = process()
347 self.outputs_by_process[name] = []
348 self.outputs_plug[name] = {}
350 for op in self.operations_by_process[name]:
351 output_paths = extract_self_output_keys(getattr(this_process, op))
352 for output_path in output_paths:
353 if output_path not in self.outputs_by_process[name]:
354 self.outputs_by_process[name].append(output_path)
356 for output in self.outputs_by_process[name]:
357 if ("output_paths" in proc) and (output in proc["output_paths"]):
358 self.outputs_plug[name][output] = proc["output_paths"][output]
359 else:
360 self.outputs_plug[name][output] = None
362 def init_config(self):
363 """Initialize configuration"""
365 for _, process in enumerate(self.list_workflow):
367 name = process["process"].__name__
369 # Define list of user parameters
370 if "user_params" in process:
371 for key, value in process["user_params"].items():
372 if key in self.params_by_process[name]:
373 self.user_params.append(value)
374 else:
375 print()
376 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red"))
377 sys.exit(1)
379 # Check on hard parameters
380 if "hard_params" in process:
381 for key, _ in process["hard_params"].items():
382 if key not in self.params_by_process[name]:
383 print()
384 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red"))
385 sys.exit(1)
387 # Define list of user paths
388 if "user_paths" in process:
389 for key, value in process["user_paths"].items():
390 if key in self.paths_by_process[name]:
391 self.user_paths.append(value)
392 else:
393 print()
394 print(colored(f"(X) {key} is not an input path of {name}.", "red"))
395 sys.exit(1)
397 # Check on required paths
398 if "required_paths" in process:
399 for _, value in process["required_paths"].items():
400 if value not in self.output_paths:
401 print()
402 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red"))
403 sys.exit(1)
405 # Define list of output paths
406 if "output_paths" in process:
407 for key, value in process["output_paths"].items():
408 if key in self.outputs_by_process[name]:
409 if value in self.output_paths:
410 print()
411 print(colored(f'(X) {value} is defined twice in "output_paths".', "red"))
412 sys.exit(1)
413 else:
414 self.output_paths.append(value)
415 else:
416 print()
417 print(colored(f"(X) {key} is not an output path of {name}.", "red"))
418 sys.exit(1)
420 # Define list of outputs for analysis
421 if "overall_analysis" in process:
422 for key, value in process["overall_analysis"].items():
423 if key in self.analysis_by_process[name]:
424 self.overall_analysis.append(value)
425 else:
426 print()
427 print(colored(f"(X) {key} is not an output analysis of {name}.", "red"))
428 sys.exit(1)
430 if value not in self.output_paths:
431 print()
432 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red"))
433 sys.exit(1)
435 # Delete duplicates
436 self.user_params = list(dict.fromkeys(self.user_params))
437 self.user_paths = list(dict.fromkeys(self.user_paths))
438 self.overall_analysis = list(dict.fromkeys(self.overall_analysis))
440 # Define analysis settings
441 for output in self.overall_analysis:
442 self.analysis_settings[output] = {}
444 for proc, settings in self.settings_by_process.items():
445 if settings:
446 for out, setting in settings.items():
447 output = self.analysis_plug[proc][out]
448 self.analysis_settings[output].update(setting)
450 def print_processes(self):
451 """Print processes"""
453 for proc in self.list_workflow:
455 name = proc["process"].__name__
457 # Printing
458 print()
459 print(
460 colored(f"| {name} |", "magenta"),
461 )
463 # ---------------- #
464 # Input parameters #
465 # ---------------- #
466 print(
467 colored(f"> Input Parameter(s) :", "blue"),
468 )
469 if len(self.params_by_process[name]) == 0:
470 print(
471 colored("None.", "blue"),
472 )
473 else:
474 lines_proc = []
475 lines_user = []
476 error = False
477 for key, value in self.params_by_process[name].items():
479 # Process
480 text_type_proc = f"({value[1]})"
481 text_variable_proc = key
482 lines_proc.append((text_type_proc, text_variable_proc))
484 # User
485 if self.params_plug[name][key] is not None:
486 text_variable_user = str(self.params_plug[name][key][0])
487 text_definition_user = f"({self.params_plug[name][key][1]})"
488 lines_user.append((text_variable_user, text_definition_user))
489 else:
490 lines_user.append(("Not defined", "(X)"))
491 error = True
493 type_proc_width = max(len(t) for t, _ in lines_proc)+1
494 variable_proc_width = max(len(p) for _, p in lines_proc)+1
495 variable_user_width = max(len(t) for t, _ in lines_user)+1
496 definition_user_width = max(len(p) for _, p in lines_user)+1
498 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user):
499 proc_str = type_proc.ljust(type_proc_width)+var_proc.ljust(variable_proc_width)+"-----|"
500 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
501 if "(X)" in user_str: color = "red"
502 else: color = "green"
503 print(colored(proc_str, "blue")+colored(user_str, color))
505 if error:
506 print()
507 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red"))
508 sys.exit(1)
510 # ----------- #
511 # Input paths #
512 # ----------- #
513 print(
514 colored(f"> Input Path(s) :", "blue"),
515 )
516 if len(self.paths_by_process[name]) == 0:
517 print(
518 colored("None.", "blue"),
519 )
520 else:
521 lines_proc = []
522 lines_user = []
523 error = False
524 for path in self.paths_by_process[name]:
526 # Process
527 lines_proc.append(path)
529 # User
530 if self.paths_plug[name][path] is not None:
531 text_variable_user = self.paths_plug[name][path][0]
532 text_definition_user = f"({self.paths_plug[name][path][1]})"
533 lines_user.append((text_variable_user, text_definition_user))
534 else:
535 lines_user.append(("Not defined", "(X)"))
536 error = True
538 proc_width = max(len(t) for t in lines_proc)+1
539 variable_user_width = max(len(t) for t, _ in lines_user)+1
540 definition_user_width = max(len(p) for _, p in lines_user)+1
542 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
543 proc_str = proc.ljust(proc_width)+"-----|"
544 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
545 if "(X)" in user_str: color = "red"
546 else: color = "green"
547 print(colored(proc_str, "blue")+colored(user_str, color))
549 if error:
550 print()
551 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red"))
552 sys.exit(1)
554 # ---------------- #
555 # Input analysis #
556 # ---------------- #
557 print(
558 colored(f"> Input Analysis :", "blue"),
559 )
560 if len(self.analysis_by_process[name]) == 0:
561 print(
562 colored("None.", "blue"),
563 )
564 else:
565 lines_proc = []
566 lines_user = []
567 error = False
568 for out in self.analysis_by_process[name]:
570 # Process
571 lines_proc.append(out)
573 # User
574 if self.analysis_plug[name][out] is not None:
575 text_variable_user = self.analysis_plug[name][out]
576 text_definition_user = "(overall_analysis)"
577 lines_user.append((text_variable_user, text_definition_user))
578 else:
579 lines_user.append(("Not defined", "(X)"))
580 error = True
582 proc_width = max(len(t) for t in lines_proc)+1
583 variable_user_width = max(len(t) for t, _ in lines_user)+1
584 definition_user_width = max(len(p) for _, p in lines_user)+1
586 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
587 proc_str = proc.ljust(proc_width)+"-----|"
588 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
589 if "(X)" in user_str: color = "red"
590 else: color = "green"
591 print(colored(proc_str, "blue")+colored(user_str, color))
593 if error:
594 print()
595 print(colored('(X) Please define all output analysis in "overall_analysis".', "red"))
596 sys.exit(1)
598 # ------------ #
599 # Output paths #
600 # ------------ #
601 print(
602 colored(f"> Output Path(s) :", "blue"),
603 )
604 if len(self.outputs_by_process[name]) == 0:
605 print(
606 colored("None.", "blue"),
607 )
608 else:
609 lines_proc = []
610 lines_user = []
611 error = False
612 for path in self.outputs_by_process[name]:
614 # Process
615 lines_proc.append(path)
617 # User
618 if self.outputs_plug[name][path] is not None:
619 text_variable_user = self.outputs_plug[name][path]
620 text_definition_user = "(output_paths)"
621 lines_user.append((text_variable_user, text_definition_user))
622 else:
623 lines_user.append(("Not defined", "(X)"))
624 error = True
626 proc_width = max(len(t) for t in lines_proc)+1
627 variable_user_width = max(len(t) for t, _ in lines_user)+1
628 definition_user_width = max(len(p) for _, p in lines_user)+1
630 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
631 proc_str = proc.ljust(proc_width)+"-----|"
632 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
633 if "(X)" in user_str: color = "red"
634 else: color = "green"
635 print(colored(proc_str, "blue")+colored(user_str, color))
637 if error:
638 print()
639 print(colored('(X) Please define all output paths in "output_paths".', "red"))
640 sys.exit(1)
642 def set_user_params_types(self):
643 """Set types of user parameters"""
645 # Gather all types of parameters
646 for proc, params in self.params_by_process.items():
647 for param, type in params.items():
648 user_param = self.params_plug[proc][param][0]
649 if user_param in self.user_params:
650 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]):
651 print()
652 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red"))
653 print(colored(f'> Please consider defining a new user parameter in "user_params".', "red"))
654 sys.exit(1)
655 self.params_type[user_param] = type
657 def print_io(self):
658 """Print inputs / outputs"""
660 # Printing
661 print()
662 print(
663 colored("> INPUTS <", "blue", attrs=["reverse"]),
664 )
666 # Print input parameters
667 print()
668 print(
669 colored(f"| User Parameters |", "magenta"),
670 )
671 for param, type in self.params_type.items():
672 print(
673 colored(f"> {param} ({type[1]})", "blue"),
674 )
675 if len(list(self.params_type.items())) == 0:
676 print(
677 colored("None.", "blue"),
678 )
680 # Print input paths
681 print()
682 print(
683 colored(f"| User Paths |", "magenta"),
684 )
685 for path in self.user_paths:
686 print(
687 colored(f"> {path}", "blue"),
688 )
689 if len(self.user_paths) == 0:
690 print(
691 colored("None.", "blue"),
692 )
694 # Printing
695 print()
696 print(
697 colored("> OUTPUTS <", "blue", attrs=["reverse"]),
698 )
699 print()
700 for path in self.output_paths:
701 print(
702 colored(f"> {path}", "blue"),
703 )
704 if len(self.output_paths) == 0:
705 print(
706 colored("None.", "blue"),
707 )
709 def define_studies(self):
710 """Define studies"""
712 print()
713 print(
714 colored("> STUDIES <", "blue", attrs=["reverse"]),
715 )
717 settings_file = self.nuremics_dir / "settings.json"
718 if len(self.dict_settings["apps"][self.app_name]["studies"]) == 0:
719 print()
720 print(colored(f"(X) Please define at least one study in file :", "red"))
721 print(colored(f"> {str(settings_file)}", "red"))
722 sys.exit(1)
723 else:
724 self.studies = self.dict_settings["apps"][self.app_name]["studies"]
726 def init_studies(self):
727 """Initialize studies"""
729 # Open studies json file if existing
730 if os.path.exists("studies.json"):
731 with open("studies.json") as f:
732 self.dict_studies = json.load(f)
734 # Clean studies
735 for study in list(self.dict_studies.keys()):
736 if study not in self.studies:
737 del self.dict_studies[study]
739 # Clean input parameters
740 for study in list(self.dict_studies.keys()):
741 for param in list(self.dict_studies[study]["user_params"]):
742 if param not in self.user_params:
743 del self.dict_studies[study]["user_params"][param]
745 # Clean input paths
746 for study in list(self.dict_studies.keys()):
747 for path in list(self.dict_studies[study]["user_paths"]):
748 if path not in self.user_paths:
749 del self.dict_studies[study]["user_paths"][path]
751 # Clean output paths
752 for study in list(self.dict_studies.keys()):
753 for path in list(self.dict_studies[study]["clean_outputs"]):
754 if path not in self.output_paths:
755 del self.dict_studies[study]["clean_outputs"][path]
757 # Initialize input parameters/paths
758 for study in self.studies:
760 if study not in self.dict_studies:
761 self.dict_studies[study] = {
762 "execute": True,
763 "user_params": {},
764 "user_paths": {},
765 "clean_outputs": {},
766 }
768 for param in self.user_params:
769 if param not in self.dict_studies[study]["user_params"]:
770 if study == "Default":
771 self.dict_studies[study]["user_params"][param] = False
772 else:
773 self.dict_studies[study]["user_params"][param] = None
775 for file in self.user_paths:
776 if file not in self.dict_studies[study]["user_paths"]:
777 if study == "Default":
778 self.dict_studies[study]["user_paths"][file] = False
779 else:
780 self.dict_studies[study]["user_paths"][file] = None
782 for path in self.output_paths:
783 if path not in self.dict_studies[study]["clean_outputs"]:
784 self.dict_studies[study]["clean_outputs"][path] = False
786 # Reordering
787 self.dict_studies[study]["user_params"] = {k: self.dict_studies[study]["user_params"][k] for k in self.user_params}
788 self.dict_studies[study]["user_paths"] = {k: self.dict_studies[study]["user_paths"][k] for k in self.user_paths}
790 # Write studies json file
791 with open("studies.json", "w") as f:
792 json.dump(self.dict_studies, f, indent=4)
794 def test_studies_modification(self):
795 """Test if studies configurations have been modified"""
797 # Loop over studies
798 for study in self.studies:
800 self.studies_modif[study] = False
802 study_file = Path(study) / ".study.json"
803 if study_file.exists():
804 with open(study_file) as f:
805 dict_study = json.load(f)
806 if (self.dict_studies[study]["user_params"] != dict_study["user_params"]) or \
807 (self.dict_studies[study]["user_paths"] != dict_study["user_paths"]):
808 self.studies_modif[study] = True
810 def test_studies_settings(self):
811 """Check if studies has been properly configured"""
813 # Loop over studies
814 for study in self.studies:
816 self.studies_messages[study] = []
817 self.studies_config[study] = True
819 for param in self.user_params:
820 if self.dict_studies[study]["user_params"][param] is None:
821 self.studies_messages[study].append(f"(X) {param} not configured.")
822 self.studies_config[study] = False
823 else:
824 if self.dict_studies[study]["user_params"][param]: text = "variable"
825 else: text = "fixed"
826 self.studies_messages[study].append(f"(V) {param} is {text}.")
828 for file in self.user_paths:
829 if self.dict_studies[study]["user_paths"][file] is None:
830 self.studies_messages[study].append(f"(X) {file} not configured.")
831 self.studies_config[study] = False
832 else:
833 if self.dict_studies[study]["user_paths"][file]: text = "variable"
834 else: text = "fixed"
835 self.studies_messages[study].append(f"(V) {file} is {text}.")
837 def print_studies(self):
838 """Print studies"""
840 for study in self.studies:
842 # Printing
843 print()
844 print(
845 colored(f"| {study} |", "magenta"),
846 )
847 if self.studies_modif[study]:
848 print(
849 colored(f"(!) Configuration has been modified.", "yellow"),
850 )
851 self.clean_output_tree(study)
853 # Delete analysis file
854 path = Path(study) / "analysis.json"
855 if path.exists(): path.unlink()
857 for message in self.studies_messages[study]:
858 if "(V)" in message: print(colored(message, "green"))
859 elif "(X)" in message: print(colored(message, "red"))
861 if not self.studies_config[study]:
862 print()
863 print(colored(f"(X) Please configure file :", "red"))
864 print(colored(f"> {str(Path.cwd() / "studies.json")}", "red"))
865 sys.exit(1)
867 def init_process_settings(self):
868 """Initialize process settings"""
870 # Loop over studies
871 for study in self.studies:
873 # Open process json file if existing
874 process_file = Path(study) / "process.json"
875 if os.path.exists(process_file):
876 with open(process_file) as f:
877 self.dict_process[study] = json.load(f)
878 else:
879 self.dict_process[study] = {}
881 # Clean processes
882 for process in list(self.dict_process[study].keys()):
883 if process not in self.list_processes:
884 del self.dict_process[study][process]
886 # Loop over processes
887 for process in self.list_processes:
888 if process not in self.dict_process[study]:
889 self.dict_process[study][process] = {
890 "execute": True,
891 "silent": self.silent,
892 }
894 # Reordering
895 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes}
897 # Write studies json file
898 with open(process_file, "w") as f:
899 json.dump(self.dict_process[study], f, indent=4)
901 def configure_inputs(self):
902 """Configure inputs with lists of fixed/variable parameters/paths"""
904 for study in self.studies:
906 # Define list of fixed/variable parameters
907 fixed_params = []
908 variable_params = []
909 for key, value in self.dict_studies[study]["user_params"].items():
910 if value is True: variable_params.append(key)
911 else: fixed_params.append(key)
913 # Define list of fixed/variable paths
914 fixed_paths = []
915 variable_paths = []
916 for key, value in self.dict_studies[study]["user_paths"].items():
917 if value is True: variable_paths.append(key)
918 else: fixed_paths.append(key)
920 self.fixed_params[study] = fixed_params
921 self.variable_params[study] = variable_params
922 self.fixed_paths[study] = fixed_paths
923 self.variable_paths[study] = variable_paths
925 def init_data_tree(self):
926 """Initialize data tree"""
928 # Loop over studies
929 for study in self.studies:
931 # Initialize study directory
932 study_dir:Path = self.working_dir / study
933 study_dir.mkdir(
934 exist_ok=True,
935 parents=True,
936 )
938 # Write study json file
939 with open(study_dir / ".study.json", "w") as f:
940 json.dump(self.dict_studies[study], f, indent=4)
942 # Initialize inputs csv
943 inputs_file:Path = study_dir / "inputs.csv"
944 if (len(self.variable_params[study]) > 0) or \
945 (len(self.variable_paths[study]) > 0):
947 if not inputs_file.exists():
949 # Create empty input dataframe
950 df_inputs = pd.DataFrame(columns=["ID"]+self.variable_params[study]+["EXECUTE"])
952 # Write input dataframe
953 df_inputs.to_csv(
954 path_or_buf=inputs_file,
955 index=False,
956 )
958 else:
960 # Read input dataframe
961 df_inputs = pd.read_csv(
962 filepath_or_buffer=inputs_file,
963 index_col=0,
964 )
966 # Update variable parameters
967 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns})
968 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]]
970 # Set default execution
971 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int)
973 # Write input dataframe
974 df_inputs.to_csv(
975 path_or_buf=inputs_file,
976 )
978 # Define list of datasets
979 self.dict_datasets[study] = df_inputs.index.tolist()
981 else:
982 # Delete file
983 if inputs_file.exists(): inputs_file.unlink()
985 # Initialize inputs json file
986 inputs_file:Path = study_dir / "inputs.json"
987 if (len(self.fixed_params[study]) > 0) or \
988 (len(self.fixed_paths[study]) > 0) or \
989 (len(self.variable_paths[study]) > 0) :
991 # Create file
992 if not inputs_file.exists():
994 # Initialize dictionary
995 dict_inputs = {}
996 if len(self.fixed_params[study]) > 0:
997 for param in self.fixed_params[study]:
998 dict_inputs[param] = None
999 if len(self.fixed_paths[study]) > 0:
1000 for path in self.fixed_paths[study]:
1001 dict_inputs[path] = None
1002 if len(self.variable_paths[study]) > 0:
1003 for path in self.variable_paths[study]:
1004 dict_inputs[path] = {}
1005 for index in df_inputs.index:
1006 dict_inputs[path][index] = None
1008 # Write json
1009 with open(inputs_file, "w") as f:
1010 json.dump(dict_inputs, f, indent=4)
1012 # Update file
1013 else:
1015 # Read inputs json
1016 with open(inputs_file) as f:
1017 dict_inputs = json.load(f)
1019 # Update fixed parameters
1020 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]}
1022 # Update fixed paths
1023 dict_fixed_paths = {}
1024 for path in self.fixed_paths[study]:
1025 value = dict_inputs.get(path, None)
1026 if isinstance(value, dict):
1027 dict_fixed_paths[path] = None
1028 else:
1029 dict_fixed_paths[path] = value
1031 # Update variable paths
1032 dict_variable_paths = {}
1033 for path in self.variable_paths[study]:
1034 existing_values = dict_inputs.get(path, {})
1035 if not isinstance(existing_values, dict):
1036 existing_values = {}
1037 dict_variable_paths[path] = {
1038 idx: existing_values.get(idx, None)
1039 for idx in df_inputs.index
1040 }
1042 # Update inputs dictionnary
1043 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths}
1045 # Write inputs json
1046 with open(inputs_file, "w") as f:
1047 json.dump(dict_inputs, f, indent=4)
1049 self.dict_inputs[study] = dict_inputs
1051 else:
1053 # Delete file
1054 if inputs_file.exists(): inputs_file.unlink()
1056 self.dict_inputs[study] = {}
1058 # Initialize inputs directory
1059 inputs_dir:Path = study_dir / "0_inputs"
1060 if len(self.user_paths) > 0:
1062 # Create inputs directory (if necessary)
1063 inputs_dir.mkdir(
1064 exist_ok=True,
1065 parents=True,
1066 )
1068 # Delete fixed paths (if necessary)
1069 input_paths = [f for f in inputs_dir.iterdir()]
1070 for path in input_paths:
1071 resolved_path = path.resolve().name
1072 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"):
1073 if Path(path).is_file(): path.unlink()
1074 else: shutil.rmtree(path)
1076 # Update inputs subfolders for variable paths
1077 datasets_dir:Path = inputs_dir / "0_datasets"
1078 if len(self.variable_paths[study]) > 0:
1080 # Create datasets directory (if necessary)
1081 datasets_dir.mkdir(
1082 exist_ok=True,
1083 parents=True,
1084 )
1086 # Create subfolders (if necessary)
1087 for index in df_inputs.index:
1089 inputs_subfolder:Path = datasets_dir / index
1090 inputs_subfolder.mkdir(
1091 exist_ok=True,
1092 parents=True,
1093 )
1095 # Delete variable paths (if necessary)
1096 input_paths = [f for f in inputs_subfolder.iterdir()]
1097 for path in input_paths:
1098 resolved_path = path.resolve().name
1099 if resolved_path not in self.variable_paths[study]:
1100 if Path(path).is_file(): path.unlink()
1101 else: shutil.rmtree(path)
1103 # Delete subfolders (if necessary)
1104 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()]
1105 for folder in inputs_subfolders:
1106 id = os.path.split(folder)[-1]
1107 if id not in self.dict_datasets[study]:
1108 shutil.rmtree(folder)
1110 else:
1112 # Delete datasets folder (if necessary)
1113 if datasets_dir.exists(): shutil.rmtree(datasets_dir)
1115 else:
1116 # Delete inputs directory (if necessary)
1117 if inputs_dir.exists(): shutil.rmtree(inputs_dir)
1119 def clean_output_tree(self,
1120 study: str,
1121 ):
1122 """Clean output data for a specific study"""
1124 # Initialize study directory
1125 study_dir:Path = self.working_dir / study
1127 # Outputs data
1128 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()]
1129 for folder in outputs_folders:
1130 if os.path.split(folder)[-1] != "0_inputs":
1131 shutil.rmtree(folder)
1133 # Paths file
1134 paths_file = study_dir / ".paths.json"
1135 if paths_file.exists(): paths_file.unlink()
1137 def set_inputs(self):
1138 """Set all inputs"""
1140 # Loop over studies
1141 for study in self.studies:
1143 # Define study directory
1144 study_dir:Path = self.working_dir / study
1146 # Go to study directory
1147 os.chdir(study_dir)
1149 # Initialize dictionary of input paths
1150 self.dict_user_paths[study] = {}
1152 # Fixed parameters
1153 if len(self.fixed_params[study]) > 0:
1154 data = self.dict_inputs[study]
1155 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data}
1156 else:
1157 self.dict_fixed_params[study] = {}
1159 # Variable parameters
1160 if (len(self.variable_params[study]) > 0) or \
1161 (len(self.variable_paths[study]) > 0):
1163 # Read input dataframe
1164 self.dict_variable_params[study] = pd.read_csv(
1165 filepath_or_buffer="inputs.csv",
1166 index_col=0,
1167 )
1169 else:
1170 self.dict_variable_params[study] = pd.DataFrame()
1172 # Fixed paths
1173 dict_input_paths = {}
1174 for file in self.fixed_paths[study]:
1175 if self.dict_inputs[study][file] is not None:
1176 dict_input_paths[file] = self.dict_inputs[study][file]
1177 else:
1178 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file)
1180 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1182 # Variable paths
1183 if len(self.variable_paths[study]) > 0:
1185 dict_input_paths = {}
1186 df_inputs = pd.read_csv(
1187 filepath_or_buffer="inputs.csv",
1188 index_col=0,
1189 )
1190 for file in self.variable_paths[study]:
1191 dict_input_paths[file] = {}
1192 for idx in df_inputs.index:
1193 if self.dict_inputs[study][file][idx] is not None:
1194 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx]
1195 else:
1196 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file)
1198 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1200 # Go back to working directory
1201 os.chdir(self.working_dir)
1203 def test_inputs_settings(self):
1204 """Test that inputs have been properly set"""
1206 # Loop over studies
1207 for study in self.studies:
1209 # Define study directory
1210 study_dir:Path = self.working_dir / study
1212 # Go to study directory
1213 os.chdir(study_dir)
1215 self.fixed_params_messages[study] = []
1216 self.fixed_paths_messages[study] = []
1217 self.fixed_params_config[study] = True
1218 self.fixed_paths_config[study] = True
1219 self.variable_params_messages[study] = {}
1220 self.variable_paths_messages[study] = {}
1221 self.variable_params_config[study] = {}
1222 self.variable_paths_config[study] = {}
1224 # Fixed parameters
1225 for param, value in self.dict_fixed_params[study].items():
1226 if value is None:
1227 self.fixed_params_messages[study].append(f"(X) {param}")
1228 self.fixed_params_config[study] = False
1229 else:
1230 if not isinstance(value, self.params_type[param][0]):
1231 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1232 else:
1233 self.fixed_params_messages[study].append(f"(V) {param}")
1235 # Fixed paths
1236 for file in self.fixed_paths[study]:
1237 file_path:Path = Path(self.dict_user_paths[study][file])
1238 if not file_path.exists():
1239 self.fixed_paths_messages[study].append(f"(X) {file}")
1240 self.fixed_paths_config[study] = False
1241 else:
1242 self.fixed_paths_messages[study].append(f"(V) {file}")
1244 # Variable inputs
1245 if (len(self.variable_params[study]) > 0) or \
1246 (len(self.variable_paths[study]) > 0):
1248 for index in self.dict_variable_params[study].index:
1250 self.variable_params_messages[study][index] = []
1251 self.variable_paths_messages[study][index] = []
1252 self.variable_params_config[study][index] = True
1253 self.variable_paths_config[study][index] = True
1255 # Variable parameters
1256 for param in self.variable_params[study]:
1257 value = self.dict_variable_params[study].at[index, param]
1258 if pd.isna(value) or value == "":
1259 self.variable_params_messages[study][index].append(f"(X) {param}")
1260 self.variable_params_config[study][index] = False
1261 else:
1262 if isinstance(value, (np.integer, np.floating, np.bool_)):
1263 value = value.item()
1264 if not isinstance(value, self.params_type[param][0]):
1265 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1266 else:
1267 self.variable_params_messages[study][index].append(f"(V) {param}")
1269 # Variable paths
1270 for file in self.variable_paths[study]:
1271 file_path:Path = Path(self.dict_user_paths[study][file][index])
1272 if not file_path.exists():
1273 self.variable_paths_messages[study][index].append(f"(X) {file}")
1274 self.variable_paths_config[study][index] = False
1275 else:
1276 self.variable_paths_messages[study][index].append(f"(V) {file}")
1278 # Go back to working directory
1279 os.chdir(self.working_dir)
1281 def print_inputs_settings(self):
1282 """Print inputs settings"""
1284 print()
1285 print(
1286 colored("> SETTINGS <", "blue", attrs=["reverse"]),
1287 )
1288 for study in self.studies:
1290 # Define study directory
1291 study_dir:Path = self.working_dir / study
1293 # Go to study directory
1294 os.chdir(study_dir)
1296 # Printing
1297 print()
1298 print(colored(f"| {study} |", "magenta"))
1300 # ------------ #
1301 # Fixed inputs #
1302 # ------------ #
1303 list_text = [colored(f"> Common :", "blue")]
1304 list_errors = []
1305 config = True
1306 type_error = False
1308 # Fixed parameters
1309 for message in self.fixed_params_messages[study]:
1310 if "(V)" in message:
1311 list_text.append(colored(message, "green"))
1312 elif "(X)" in message:
1313 list_text.append(colored(message, "red"))
1314 if config:
1315 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.json")}", "red"))
1316 config = False
1317 elif "(!)" in message:
1318 list_text.append(colored(message, "yellow"))
1319 type_error = True
1321 # Fixed paths
1322 for i, message in enumerate(self.fixed_paths_messages[study]):
1323 if "(V)" in message:
1324 list_text.append(colored(message, "green"))
1325 elif "(X)" in message:
1326 file = self.fixed_paths[study][i]
1327 path = self.dict_user_paths[study][file]
1328 list_text.append(colored(message, "red"))
1329 list_errors.append(colored(f"> {path}", "red"))
1331 # Printing
1332 if len(list_text) == 1:
1333 print(colored(f"None.", "blue"))
1334 else:
1335 print(*list_text)
1337 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]:
1338 print()
1339 print(colored(f"(X) Please set inputs :", "red"))
1340 for error in list_errors:
1341 print(error)
1342 sys.exit(1)
1344 if type_error:
1345 print()
1346 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red"))
1347 print(colored(f"> {str(Path.cwd() / "inputs.json")}", "red"))
1348 sys.exit(1)
1350 # --------------- #
1351 # Variable inputs #
1352 # --------------- #
1353 list_errors = []
1354 config = True
1355 type_error = False
1357 if (len(self.variable_params[study]) > 0) or \
1358 (len(self.variable_paths[study]) > 0):
1360 # Check if datasets have been defined
1361 if len(self.dict_variable_params[study].index) == 0:
1362 print()
1363 print(colored(f"(X) Please define at least one dataset in file :", "red"))
1364 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1365 sys.exit(1)
1367 for index in self.dict_variable_params[study].index:
1369 list_text = [colored(f"> {index} :", "blue")]
1371 # Variable parameters
1372 for message in self.variable_params_messages[study][index]:
1373 if "(V)" in message:
1374 list_text.append(colored(message, "green"))
1375 elif "(X)" in message:
1376 list_text.append(colored(message, "red"))
1377 if config:
1378 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1379 config = False
1380 elif "(!)" in message:
1381 list_text.append(colored(message, "yellow"))
1382 type_error = True
1384 # Variable paths
1385 for i, message in enumerate(self.variable_paths_messages[study][index]):
1386 if "(V)" in message:
1387 list_text.append(colored(message, "green"))
1388 elif "(X)" in message:
1389 file = self.variable_paths[study][i]
1390 path = self.dict_user_paths[study][file][index]
1391 list_text.append(colored(message, "red"))
1392 list_errors.append(colored(f"> {path}", "red"))
1394 # Printing
1395 print(*list_text)
1397 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1)
1398 if len(list_errors) > 0:
1399 print()
1400 print(colored(f"(X) Please set inputs :", "red"))
1401 for error in list_errors:
1402 print(error)
1403 sys.exit(1)
1405 if type_error:
1406 print()
1407 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red"))
1408 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1409 sys.exit(1)
1411 # Go back to working directory
1412 os.chdir(self.working_dir)
1414 def init_paths(self):
1415 """Initialize dictionary containing all paths"""
1417 # Loop over studies
1418 for study in self.studies:
1420 # Define study directory
1421 study_dir:Path = self.working_dir / study
1423 try:
1424 with open(study_dir / ".paths.json") as f:
1425 dict_paths = json.load(f)
1426 except:
1427 dict_paths = {}
1428 for path in self.output_paths:
1429 dict_paths[path] = None
1431 # Purge old datasets
1432 for key, value in dict_paths.items():
1433 if isinstance(value, dict):
1434 # List of datasets to delete
1435 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]]
1436 for dataset in to_delete:
1437 del dict_paths[key][dataset]
1439 self.dict_paths[study] = dict_paths
1441 def update_analysis(self):
1443 # Loop over studies
1444 for study in self.studies:
1446 # Define study directory
1447 study_dir:Path = self.working_dir / study
1449 # Define analysis file
1450 analysis_file = study_dir / "analysis.json"
1452 # Initialize analysis file
1453 if os.path.exists(analysis_file):
1454 with open(analysis_file) as f:
1455 self.dict_analysis[study] = json.load(f)
1456 else:
1457 self.dict_analysis[study] = {}
1459 # Browse all outputs
1460 for out, value in self.dict_paths[study].items():
1462 if out in self.analysis_settings:
1463 dict_out = self.analysis_settings[out]
1464 else:
1465 dict_out = {}
1467 if out not in self.dict_analysis[study]:
1468 self.dict_analysis[study][out] = {}
1469 if isinstance(value, dict):
1470 for case in value:
1471 self.dict_analysis[study][out][case] = dict_out
1473 else:
1474 if isinstance(value, dict):
1475 for case in value:
1476 if case not in self.dict_analysis[study][out]:
1477 self.dict_analysis[study][out][case] = dict_out
1479 cases_to_delete = []
1480 for case in self.dict_analysis[study][out]:
1481 if case not in value:
1482 cases_to_delete.append(case)
1484 for case in cases_to_delete:
1485 if case in self.dict_analysis[study][out]:
1486 del self.dict_analysis[study][out][case]
1488 with open(analysis_file, "w") as f:
1489 json.dump(self.dict_analysis[study], f, indent=4)
1491 def clean_outputs(self):
1492 """Clean outputs."""
1494 # Function to remove output path, either file or directory
1495 def _remove_output(output: str):
1496 output_path = Path(output)
1497 if output_path.exists():
1498 if output_path.is_dir():
1499 shutil.rmtree(output)
1500 else:
1501 output_path.unlink()
1503 # Loop over studies
1504 for study, study_dict in self.dict_studies.items():
1506 # Delete specified outputs
1507 for key, value in study_dict["clean_outputs"].items():
1508 if value:
1509 if isinstance(self.dict_paths[study][key], str):
1510 _remove_output(self.dict_paths[study][key])
1511 if isinstance(self.dict_paths[study][key], dict):
1512 for _, value in self.dict_paths[study][key].items():
1513 _remove_output(value)
1515 def purge_output_datasets(self,
1516 study: str,
1517 ):
1518 """Purge output datasets for a specific study"""
1520 datasets_paths = [f for f in Path.cwd().iterdir()]
1521 for path in datasets_paths:
1522 resolved_path = path.resolve().name
1523 if resolved_path not in self.dict_datasets[study]:
1524 shutil.rmtree(path)
1526 def update_workflow_diagram(self,
1527 process: Process,
1528 ):
1529 """Update workflow diagram for specific process"""
1531 self.diagram[process.name] = {
1532 "params": list(process.params.values()),
1533 "allparams": process.allparams,
1534 "paths": list(process.paths.values()),
1535 "allpaths": process.allpaths,
1536 "required_paths": list(process.required_paths.values()),
1537 "output_paths": list(process.output_paths.values()),
1538 }
1540 def __call__(self):
1541 """Launch workflow of processes."""
1543 # --------------- #
1544 # Launch workflow #
1545 # --------------- #
1546 print()
1547 print(
1548 colored("> RUNNING <", "blue", attrs=["reverse"]),
1549 )
1551 for study, dict_study in self.dict_studies.items():
1553 # Check if study must be executed
1554 if not dict_study["execute"]:
1556 # Printing
1557 print()
1558 print(
1559 colored(f"| {study} |", "magenta"),
1560 )
1561 print()
1562 print(colored("(!) Study is skipped.", "yellow"))
1564 continue
1566 study_dir:Path = self.working_dir / study
1567 os.chdir(study_dir)
1569 for step, proc in enumerate(self.list_workflow):
1571 # Update analysis
1572 self.update_analysis()
1574 if "hard_params" in proc: dict_hard_params = proc["hard_params"]
1575 else: dict_hard_params = {}
1576 if "user_params" in proc: user_params = proc["user_params"]
1577 else: user_params = {}
1578 if "user_paths" in proc: user_paths = proc["user_paths"]
1579 else: user_paths = {}
1580 if "required_paths" in proc: required_paths = proc["required_paths"]
1581 else: required_paths = {}
1582 if "output_paths" in proc: output_paths = proc["output_paths"]
1583 else: output_paths = {}
1584 if "overall_analysis" in proc: overall_analysis = proc["overall_analysis"]
1585 else: overall_analysis = {}
1587 # Define class object for the current process
1588 process = proc["process"]
1589 this_process:Process = process(
1590 study=study,
1591 df_user_params=self.dict_variable_params[study],
1592 dict_user_params=self.dict_fixed_params[study],
1593 dict_user_paths=self.dict_user_paths[study],
1594 dict_paths=self.dict_paths[study],
1595 params=user_params,
1596 paths=user_paths,
1597 dict_hard_params=dict_hard_params,
1598 fixed_params=self.fixed_params[study],
1599 variable_params=self.variable_params[study],
1600 fixed_paths=self.fixed_paths[study],
1601 variable_paths=self.variable_paths[study],
1602 required_paths=required_paths,
1603 output_paths=output_paths,
1604 overall_analysis=overall_analysis,
1605 dict_analysis=self.dict_analysis[study],
1606 silent=self.dict_process[study][self.list_processes[step]]["silent"],
1607 diagram=self.diagram,
1608 )
1610 # Define process name
1611 this_process.name = this_process.__class__.__name__
1613 # Define working folder associated to the current process
1614 folder_name = f"{step+1}_{this_process.name}"
1615 folder_path:Path = study_dir / folder_name
1616 folder_path.mkdir(exist_ok=True, parents=True)
1617 os.chdir(folder_path)
1619 # Initialize process
1620 this_process.initialize()
1622 # Check if process must be executed
1623 if not self.dict_process[study][self.list_processes[step]]["execute"]:
1625 # Printing
1626 print()
1627 print(
1628 colored(f"| {study} | {this_process.name} |", "magenta"),
1629 )
1630 print()
1631 print(colored("(!) Process is skipped.", "yellow"))
1633 # Update workflow diagram
1634 self.update_workflow_diagram(this_process)
1636 continue
1638 if this_process.is_case:
1640 # Define sub-folders associated to each ID of the inputs dataframe
1641 for idx in this_process.df_params.index:
1643 # Printing
1644 print()
1645 print(
1646 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"),
1647 )
1649 # Check if dataset must be executed
1650 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0:
1652 # Printing
1653 print()
1654 print(colored("(!) Experiment is skipped.", "yellow"))
1656 # Go back to working folder
1657 os.chdir(folder_path)
1659 # Purge old output datasets
1660 self.purge_output_datasets(study)
1662 # Update workflow diagram
1663 self.update_workflow_diagram(this_process)
1665 continue
1667 # Update process index
1668 this_process.index = idx
1670 subfolder_path = study_dir / folder_name / str(idx)
1671 subfolder_path.mkdir(exist_ok=True, parents=True)
1672 os.chdir(subfolder_path)
1674 # Launch process
1675 this_process()
1676 this_process.finalize()
1678 # Go back to working folder
1679 os.chdir(folder_path)
1681 # Purge old output datasets
1682 self.purge_output_datasets(study)
1684 else:
1686 # Printing
1687 print()
1688 print(
1689 colored(f"| {study} | {this_process.name} |", "magenta"),
1690 )
1692 # Launch process
1693 this_process()
1694 this_process.finalize()
1696 # Update workflow diagram
1697 self.update_workflow_diagram(this_process)
1699 # Update paths dictonary
1700 self.dict_paths[study] = this_process.dict_paths
1702 # Write paths json file
1703 with open(study_dir / ".paths.json", "w") as f:
1704 json.dump(self.dict_paths[study], f, indent=4)
1706 # Go back to study directory
1707 os.chdir(study_dir)
1709 # Write diagram json file
1710 with open(".diagram.json", "w") as f:
1711 json.dump(self.diagram, f, indent=4)
1713 # Go back to working directory
1714 os.chdir(self.working_dir)
1716 # Delete unecessary outputs
1717 self.clean_outputs()