Coverage for C:\Users\julie\micromamba\envs\nrs-env\Lib\site-packages\nuremics\core\workflow.py: 82%

938 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-30 11:13 +0200

1from __future__ import annotations 

2 

3import os 

4import pathlib 

5import sys 

6 

7import json 

8import shutil 

9import numpy as np 

10import pandas as pd 

11from pathlib import Path 

12from termcolor import colored 

13 

14from .process import Process 

15from .utils import ( 

16 get_self_method_calls, 

17 only_function_calls, 

18 extract_inputs_and_types, 

19 extract_analysis, 

20 extract_self_output_keys, 

21) 

22from importlib.resources import files 

23 

24 

25class WorkFlow: 

26 """Manage workflow of processes.""" 

27 

28 def __init__( 

29 self, 

30 app_name: str, 

31 nuremics_dir: str, 

32 workflow: list, 

33 silent: bool = False, 

34 ): 

35 """Initialization.""" 

36 

37 # -------------------- # 

38 # Initialize variables # 

39 # -------------------- # 

40 self.app_name = app_name 

41 self.list_workflow = workflow 

42 self.list_processes = [] 

43 self.dict_inputs = {} 

44 self.dict_datasets = {} 

45 self.dict_studies = {} 

46 self.dict_process = {} 

47 self.dict_analysis = {} 

48 self.user_params = [] 

49 self.user_paths = [] 

50 self.output_paths = [] 

51 self.overall_analysis = [] 

52 self.analysis_settings = {} 

53 self.params_type = {} 

54 self.operations_by_process = {} 

55 self.inputs_by_process = {} 

56 self.params_by_process = {} 

57 self.paths_by_process = {} 

58 self.outputs_by_process = {} 

59 self.analysis_by_process = {} 

60 self.settings_by_process = {} 

61 self.params_plug = {} 

62 self.paths_plug = {} 

63 self.outputs_plug = {} 

64 self.analysis_plug = {} 

65 self.studies_modif = {} 

66 self.studies_messages = {} 

67 self.studies_config = {} 

68 self.fixed_params_messages = {} 

69 self.fixed_params_config = {} 

70 self.fixed_paths_messages = {} 

71 self.fixed_paths_config = {} 

72 self.variable_params_messages = {} 

73 self.variable_params_config = {} 

74 self.variable_paths_messages = {} 

75 self.variable_paths_config = {} 

76 self.fixed_params = {} 

77 self.fixed_paths = {} 

78 self.variable_params = {} 

79 self.variable_paths = {} 

80 self.dict_fixed_params = {} 

81 self.dict_variable_params = {} 

82 self.dict_user_paths = {} 

83 self.dict_paths = {} 

84 self.diagram = {} 

85 self.silent = silent 

86 

87 # ------------------------------------ # 

88 # Define and create nuremics directory # 

89 # ------------------------------------ # 

90 self.nuremics_dir = Path(nuremics_dir) / ".nuremics" 

91 self.nuremics_dir.mkdir( 

92 exist_ok=True, 

93 parents=True, 

94 ) 

95 

96 # -------------------- # 

97 # Create settings file # 

98 # -------------------- # 

99 settings_file = self.nuremics_dir / "settings.json" 

100 if not settings_file.exists(): 

101 dict_settings = { 

102 "default_working_dir": None, 

103 "apps": {}, 

104 } 

105 with open(settings_file, "w") as f: 

106 json.dump(dict_settings, f, indent=4) 

107 

108 # -------------------------- # 

109 # Define settings dictionary # 

110 # -------------------------- # 

111 with open(settings_file) as f: 

112 self.dict_settings = json.load(f) 

113 

114 # ------------------------------- # 

115 # Initialize application settings # 

116 # ------------------------------- # 

117 if self.app_name not in self.dict_settings["apps"]: 

118 self.dict_settings["apps"][self.app_name] = { 

119 "working_dir": None, 

120 "studies": [], 

121 } 

122 

123 # ----------------------------- # 

124 # Set default working directory # 

125 # ----------------------------- # 

126 if self.dict_settings["default_working_dir"] is None: 

127 for _, value in self.dict_settings["apps"].items(): 

128 if value["working_dir"] is not None: 

129 self.dict_settings["default_working_dir"] = value["working_dir"] 

130 break 

131 

132 # ------------------- # 

133 # Write settings file # 

134 # ------------------- # 

135 with open(settings_file, "w") as f: 

136 json.dump(self.dict_settings, f, indent=4) 

137 

138 # ------------------------ # 

139 # Define list of processes # 

140 # ------------------------ # 

141 for proc in self.list_workflow: 

142 self.list_processes.append(proc["process"].__name__) 

143 

144 def print_logo(self): 

145 """Print ASCII NUREMICS logo""" 

146 

147 ascii_logo_path:str = files("nuremics.resources").joinpath("logo.txt") 

148 f = open(ascii_logo_path, "r") 

149 for line in f: 

150 lines = f.readlines() 

151 print() 

152 for line in lines: 

153 print(colored(line.rstrip(), "yellow")) 

154 

155 def print_application(self): 

156 """Print application""" 

157 

158 # Printing 

159 print() 

160 print( 

161 colored("> APPLICATION <", "blue", attrs=["reverse"]), 

162 ) 

163 print() 

164 print( 

165 colored(f"| Workflow |", "magenta"), 

166 ) 

167 print( 

168 colored(f"{self.app_name}_____", "blue"), 

169 ) 

170 

171 # Define number of spaces taken by the workflow print 

172 nb_spaces_app = len(self.app_name)+5 

173 

174 # Print diagram of processes and operations 

175 error = False 

176 for i, proc in enumerate(self.list_workflow): 

177 

178 proc_name = proc["process"].__name__ 

179 process = proc["process"] 

180 this_process:Process = process() 

181 

182 # Define number of spaces taken by the application print 

183 nb_spaces_proc = len(proc_name)+10 

184 

185 # Get list of operations for current process 

186 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__) 

187 

188 # Test if process call contains only call to operations 

189 valid_call = only_function_calls( 

190 method=this_process.__call__, 

191 allowed_methods=self.operations_by_process[proc_name] 

192 ) 

193 

194 # Printing 

195 if valid_call: 

196 print( 

197 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue"), 

198 ) 

199 for op_name in self.operations_by_process[proc_name]: 

200 

201 if i < len(self.list_workflow)-1: 

202 text = " "*nb_spaces_app+"|"+" "*nb_spaces_proc+f"|_____{op_name}" 

203 else: 

204 text = " "*(nb_spaces_app+1)+" "*nb_spaces_proc+f"|_____{op_name}" 

205 

206 # Printing 

207 print( 

208 colored(text, "blue"), 

209 ) 

210 else: 

211 print( 

212 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue") + \ 

213 colored("(X)", "red") 

214 ) 

215 error = True 

216 

217 if i < len(self.list_workflow)-1: 

218 print( 

219 colored(" "*nb_spaces_app+"|", "blue"), 

220 ) 

221 

222 if error: 

223 print() 

224 print(colored(f"(X) Each process must only call its internal function(s):", "red")) 

225 print() 

226 print(colored(f" def __call__(self):", "red")) 

227 print(colored(f" super().__call__()", "red")) 

228 print() 

229 print(colored(f" self.operation1()", "red")) 

230 print(colored(f" self.operation2()", "red")) 

231 print(colored(f" self.operation3()", "red")) 

232 print(colored(f" ...", "red")) 

233 sys.exit(1) 

234 

235 def set_working_directory(self): 

236 """Set working directory""" 

237 

238 # --------------------- # 

239 # Set working directory # 

240 # --------------------- # 

241 settings_file = self.nuremics_dir / "settings.json" 

242 if self.dict_settings["apps"][self.app_name]["working_dir"] is None: 

243 if self.dict_settings["default_working_dir"] is None: 

244 print() 

245 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red")) 

246 print(colored(f"> {str(settings_file)}", "red")) 

247 sys.exit(1) 

248 else: 

249 print() 

250 print(colored(f'(!) Found "default_working_dir": {self.dict_settings["default_working_dir"]}', "yellow")) 

251 while True: 

252 answer = input(colored(f'Accept it as "working_dir" for {self.app_name}: [Y/n] ', "yellow")).strip().lower() 

253 if answer in ["y", "yes", ""]: 

254 self.dict_settings["apps"][self.app_name]["working_dir"] = self.dict_settings["default_working_dir"] 

255 break 

256 elif answer in ["n", "no"]: 

257 print() 

258 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red")) 

259 print(colored(f"> {str(settings_file)}", "red")) 

260 sys.exit(1) 

261 

262 self.working_dir = Path(self.dict_settings["apps"][self.app_name]["working_dir"]) / self.app_name 

263 

264 # ------------------- # 

265 # Write settings file # 

266 # ------------------- # 

267 with open(settings_file, "w") as f: 

268 json.dump(self.dict_settings, f, indent=4) 

269 

270 # ------------------------ # 

271 # Create working directory # 

272 # ------------------------ # 

273 self.working_dir.mkdir( 

274 exist_ok=True, 

275 parents=True, 

276 ) 

277 

278 # ----------------------- # 

279 # Go to working directory # 

280 # ----------------------- # 

281 os.chdir(self.working_dir) 

282 

283 def get_inputs(self): 

284 """Get inputs""" 

285 

286 for proc in self.list_workflow: 

287 

288 process = proc["process"] 

289 name = proc["process"].__name__ 

290 this_process:Process = process() 

291 

292 self.inputs_by_process[name] = extract_inputs_and_types(this_process) 

293 self.analysis_by_process[name], self.settings_by_process[name] = extract_analysis(this_process) 

294 

295 self.params_by_process[name] = {} 

296 self.paths_by_process[name] = [] 

297 self.params_plug[name] = {} 

298 self.paths_plug[name] = {} 

299 self.analysis_plug[name] = {} 

300 

301 for key, value_type in self.inputs_by_process[name].items(): 

302 

303 # Get the module and type name 

304 module_name = value_type.__module__ 

305 type_name = value_type.__name__ 

306 

307 if module_name == "builtins": 

308 type = type_name 

309 else: 

310 type = f"{module_name}.{type_name}" 

311 

312 if key not in self.analysis_by_process[name]: 

313 

314 if issubclass(value_type, pathlib.Path): 

315 self.paths_by_process[name].append(key) 

316 if ("user_paths" in proc) and (key in proc["user_paths"]): 

317 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"] 

318 elif ("required_paths" in proc) and (key in proc["required_paths"]): 

319 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"] 

320 else: 

321 self.paths_plug[name][key] = None 

322 

323 else: 

324 self.params_by_process[name][key] = [value_type, type] 

325 if ("user_params" in proc) and (key in proc["user_params"]): 

326 self.params_plug[name][key] = [proc["user_params"][key], "user_params"] 

327 elif ("hard_params" in proc) and (key in proc["hard_params"]): 

328 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"] 

329 else: 

330 self.params_plug[name][key] = None 

331 

332 else: 

333 if ("overall_analysis" in proc) and (key in proc["overall_analysis"]): 

334 self.analysis_plug[name][key] = proc["overall_analysis"][key] 

335 else: 

336 self.analysis_plug[name][key] = None 

337 

338 def get_outputs(self): 

339 """Get outputs""" 

340 

341 for proc in self.list_workflow: 

342 

343 process = proc["process"] 

344 name = proc["process"].__name__ 

345 this_process:Process = process() 

346 

347 self.outputs_by_process[name] = [] 

348 self.outputs_plug[name] = {} 

349 

350 for op in self.operations_by_process[name]: 

351 output_paths = extract_self_output_keys(getattr(this_process, op)) 

352 for output_path in output_paths: 

353 if output_path not in self.outputs_by_process[name]: 

354 self.outputs_by_process[name].append(output_path) 

355 

356 for output in self.outputs_by_process[name]: 

357 if ("output_paths" in proc) and (output in proc["output_paths"]): 

358 self.outputs_plug[name][output] = proc["output_paths"][output] 

359 else: 

360 self.outputs_plug[name][output] = None 

361 

362 def init_config(self): 

363 """Initialize configuration""" 

364 

365 for _, process in enumerate(self.list_workflow): 

366 

367 name = process["process"].__name__ 

368 

369 # Define list of user parameters 

370 if "user_params" in process: 

371 for key, value in process["user_params"].items(): 

372 if key in self.params_by_process[name]: 

373 self.user_params.append(value) 

374 else: 

375 print() 

376 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red")) 

377 sys.exit(1) 

378 

379 # Check on hard parameters 

380 if "hard_params" in process: 

381 for key, _ in process["hard_params"].items(): 

382 if key not in self.params_by_process[name]: 

383 print() 

384 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red")) 

385 sys.exit(1) 

386 

387 # Define list of user paths 

388 if "user_paths" in process: 

389 for key, value in process["user_paths"].items(): 

390 if key in self.paths_by_process[name]: 

391 self.user_paths.append(value) 

392 else: 

393 print() 

394 print(colored(f"(X) {key} is not an input path of {name}.", "red")) 

395 sys.exit(1) 

396 

397 # Check on required paths 

398 if "required_paths" in process: 

399 for _, value in process["required_paths"].items(): 

400 if value not in self.output_paths: 

401 print() 

402 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red")) 

403 sys.exit(1) 

404 

405 # Define list of output paths 

406 if "output_paths" in process: 

407 for key, value in process["output_paths"].items(): 

408 if key in self.outputs_by_process[name]: 

409 if value in self.output_paths: 

410 print() 

411 print(colored(f'(X) {value} is defined twice in "output_paths".', "red")) 

412 sys.exit(1) 

413 else: 

414 self.output_paths.append(value) 

415 else: 

416 print() 

417 print(colored(f"(X) {key} is not an output path of {name}.", "red")) 

418 sys.exit(1) 

419 

420 # Define list of outputs for analysis 

421 if "overall_analysis" in process: 

422 for key, value in process["overall_analysis"].items(): 

423 if key in self.analysis_by_process[name]: 

424 self.overall_analysis.append(value) 

425 else: 

426 print() 

427 print(colored(f"(X) {key} is not an output analysis of {name}.", "red")) 

428 sys.exit(1) 

429 

430 if value not in self.output_paths: 

431 print() 

432 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red")) 

433 sys.exit(1) 

434 

435 # Delete duplicates 

436 self.user_params = list(dict.fromkeys(self.user_params)) 

437 self.user_paths = list(dict.fromkeys(self.user_paths)) 

438 self.overall_analysis = list(dict.fromkeys(self.overall_analysis)) 

439 

440 # Define analysis settings 

441 for output in self.overall_analysis: 

442 self.analysis_settings[output] = {} 

443 

444 for proc, settings in self.settings_by_process.items(): 

445 if settings: 

446 for out, setting in settings.items(): 

447 output = self.analysis_plug[proc][out] 

448 self.analysis_settings[output].update(setting) 

449 

450 def print_processes(self): 

451 """Print processes""" 

452 

453 for proc in self.list_workflow: 

454 

455 name = proc["process"].__name__ 

456 

457 # Printing 

458 print() 

459 print( 

460 colored(f"| {name} |", "magenta"), 

461 ) 

462 

463 # ---------------- # 

464 # Input parameters # 

465 # ---------------- # 

466 print( 

467 colored(f"> Input Parameter(s) :", "blue"), 

468 ) 

469 if len(self.params_by_process[name]) == 0: 

470 print( 

471 colored("None.", "blue"), 

472 ) 

473 else: 

474 lines_proc = [] 

475 lines_user = [] 

476 error = False 

477 for key, value in self.params_by_process[name].items(): 

478 

479 # Process 

480 text_type_proc = f"({value[1]})" 

481 text_variable_proc = key 

482 lines_proc.append((text_type_proc, text_variable_proc)) 

483 

484 # User 

485 if self.params_plug[name][key] is not None: 

486 text_variable_user = str(self.params_plug[name][key][0]) 

487 text_definition_user = f"({self.params_plug[name][key][1]})" 

488 lines_user.append((text_variable_user, text_definition_user)) 

489 else: 

490 lines_user.append(("Not defined", "(X)")) 

491 error = True 

492 

493 type_proc_width = max(len(t) for t, _ in lines_proc)+1 

494 variable_proc_width = max(len(p) for _, p in lines_proc)+1 

495 variable_user_width = max(len(t) for t, _ in lines_user)+1 

496 definition_user_width = max(len(p) for _, p in lines_user)+1 

497 

498 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user): 

499 proc_str = type_proc.ljust(type_proc_width)+var_proc.ljust(variable_proc_width)+"-----|" 

500 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

501 if "(X)" in user_str: color = "red" 

502 else: color = "green" 

503 print(colored(proc_str, "blue")+colored(user_str, color)) 

504 

505 if error: 

506 print() 

507 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red")) 

508 sys.exit(1) 

509 

510 # ----------- # 

511 # Input paths # 

512 # ----------- # 

513 print( 

514 colored(f"> Input Path(s) :", "blue"), 

515 ) 

516 if len(self.paths_by_process[name]) == 0: 

517 print( 

518 colored("None.", "blue"), 

519 ) 

520 else: 

521 lines_proc = [] 

522 lines_user = [] 

523 error = False 

524 for path in self.paths_by_process[name]: 

525 

526 # Process 

527 lines_proc.append(path) 

528 

529 # User 

530 if self.paths_plug[name][path] is not None: 

531 text_variable_user = self.paths_plug[name][path][0] 

532 text_definition_user = f"({self.paths_plug[name][path][1]})" 

533 lines_user.append((text_variable_user, text_definition_user)) 

534 else: 

535 lines_user.append(("Not defined", "(X)")) 

536 error = True 

537 

538 proc_width = max(len(t) for t in lines_proc)+1 

539 variable_user_width = max(len(t) for t, _ in lines_user)+1 

540 definition_user_width = max(len(p) for _, p in lines_user)+1 

541 

542 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

543 proc_str = proc.ljust(proc_width)+"-----|" 

544 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

545 if "(X)" in user_str: color = "red" 

546 else: color = "green" 

547 print(colored(proc_str, "blue")+colored(user_str, color)) 

548 

549 if error: 

550 print() 

551 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red")) 

552 sys.exit(1) 

553 

554 # ---------------- # 

555 # Input analysis # 

556 # ---------------- # 

557 print( 

558 colored(f"> Input Analysis :", "blue"), 

559 ) 

560 if len(self.analysis_by_process[name]) == 0: 

561 print( 

562 colored("None.", "blue"), 

563 ) 

564 else: 

565 lines_proc = [] 

566 lines_user = [] 

567 error = False 

568 for out in self.analysis_by_process[name]: 

569 

570 # Process 

571 lines_proc.append(out) 

572 

573 # User 

574 if self.analysis_plug[name][out] is not None: 

575 text_variable_user = self.analysis_plug[name][out] 

576 text_definition_user = "(overall_analysis)" 

577 lines_user.append((text_variable_user, text_definition_user)) 

578 else: 

579 lines_user.append(("Not defined", "(X)")) 

580 error = True 

581 

582 proc_width = max(len(t) for t in lines_proc)+1 

583 variable_user_width = max(len(t) for t, _ in lines_user)+1 

584 definition_user_width = max(len(p) for _, p in lines_user)+1 

585 

586 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

587 proc_str = proc.ljust(proc_width)+"-----|" 

588 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

589 if "(X)" in user_str: color = "red" 

590 else: color = "green" 

591 print(colored(proc_str, "blue")+colored(user_str, color)) 

592 

593 if error: 

594 print() 

595 print(colored('(X) Please define all output analysis in "overall_analysis".', "red")) 

596 sys.exit(1) 

597 

598 # ------------ # 

599 # Output paths # 

600 # ------------ # 

601 print( 

602 colored(f"> Output Path(s) :", "blue"), 

603 ) 

604 if len(self.outputs_by_process[name]) == 0: 

605 print( 

606 colored("None.", "blue"), 

607 ) 

608 else: 

609 lines_proc = [] 

610 lines_user = [] 

611 error = False 

612 for path in self.outputs_by_process[name]: 

613 

614 # Process 

615 lines_proc.append(path) 

616 

617 # User 

618 if self.outputs_plug[name][path] is not None: 

619 text_variable_user = self.outputs_plug[name][path] 

620 text_definition_user = "(output_paths)" 

621 lines_user.append((text_variable_user, text_definition_user)) 

622 else: 

623 lines_user.append(("Not defined", "(X)")) 

624 error = True 

625 

626 proc_width = max(len(t) for t in lines_proc)+1 

627 variable_user_width = max(len(t) for t, _ in lines_user)+1 

628 definition_user_width = max(len(p) for _, p in lines_user)+1 

629 

630 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

631 proc_str = proc.ljust(proc_width)+"-----|" 

632 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

633 if "(X)" in user_str: color = "red" 

634 else: color = "green" 

635 print(colored(proc_str, "blue")+colored(user_str, color)) 

636 

637 if error: 

638 print() 

639 print(colored('(X) Please define all output paths in "output_paths".', "red")) 

640 sys.exit(1) 

641 

642 def set_user_params_types(self): 

643 """Set types of user parameters""" 

644 

645 # Gather all types of parameters 

646 for proc, params in self.params_by_process.items(): 

647 for param, type in params.items(): 

648 user_param = self.params_plug[proc][param][0] 

649 if user_param in self.user_params: 

650 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]): 

651 print() 

652 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red")) 

653 print(colored(f'> Please consider defining a new user parameter in "user_params".', "red")) 

654 sys.exit(1) 

655 self.params_type[user_param] = type 

656 

657 def print_io(self): 

658 """Print inputs / outputs""" 

659 

660 # Printing 

661 print() 

662 print( 

663 colored("> INPUTS <", "blue", attrs=["reverse"]), 

664 ) 

665 

666 # Print input parameters 

667 print() 

668 print( 

669 colored(f"| User Parameters |", "magenta"), 

670 ) 

671 for param, type in self.params_type.items(): 

672 print( 

673 colored(f"> {param} ({type[1]})", "blue"), 

674 ) 

675 if len(list(self.params_type.items())) == 0: 

676 print( 

677 colored("None.", "blue"), 

678 ) 

679 

680 # Print input paths 

681 print() 

682 print( 

683 colored(f"| User Paths |", "magenta"), 

684 ) 

685 for path in self.user_paths: 

686 print( 

687 colored(f"> {path}", "blue"), 

688 ) 

689 if len(self.user_paths) == 0: 

690 print( 

691 colored("None.", "blue"), 

692 ) 

693 

694 # Printing 

695 print() 

696 print( 

697 colored("> OUTPUTS <", "blue", attrs=["reverse"]), 

698 ) 

699 print() 

700 for path in self.output_paths: 

701 print( 

702 colored(f"> {path}", "blue"), 

703 ) 

704 if len(self.output_paths) == 0: 

705 print( 

706 colored("None.", "blue"), 

707 ) 

708 

709 def define_studies(self): 

710 """Define studies""" 

711 

712 print() 

713 print( 

714 colored("> STUDIES <", "blue", attrs=["reverse"]), 

715 ) 

716 

717 settings_file = self.nuremics_dir / "settings.json" 

718 if len(self.dict_settings["apps"][self.app_name]["studies"]) == 0: 

719 print() 

720 print(colored(f"(X) Please define at least one study in file :", "red")) 

721 print(colored(f"> {str(settings_file)}", "red")) 

722 sys.exit(1) 

723 else: 

724 self.studies = self.dict_settings["apps"][self.app_name]["studies"] 

725 

726 def init_studies(self): 

727 """Initialize studies""" 

728 

729 # Open studies json file if existing 

730 if os.path.exists("studies.json"): 

731 with open("studies.json") as f: 

732 self.dict_studies = json.load(f) 

733 

734 # Clean studies 

735 for study in list(self.dict_studies.keys()): 

736 if study not in self.studies: 

737 del self.dict_studies[study] 

738 

739 # Clean input parameters 

740 for study in list(self.dict_studies.keys()): 

741 for param in list(self.dict_studies[study]["user_params"]): 

742 if param not in self.user_params: 

743 del self.dict_studies[study]["user_params"][param] 

744 

745 # Clean input paths 

746 for study in list(self.dict_studies.keys()): 

747 for path in list(self.dict_studies[study]["user_paths"]): 

748 if path not in self.user_paths: 

749 del self.dict_studies[study]["user_paths"][path] 

750 

751 # Clean output paths 

752 for study in list(self.dict_studies.keys()): 

753 for path in list(self.dict_studies[study]["clean_outputs"]): 

754 if path not in self.output_paths: 

755 del self.dict_studies[study]["clean_outputs"][path] 

756 

757 # Initialize input parameters/paths 

758 for study in self.studies: 

759 

760 if study not in self.dict_studies: 

761 self.dict_studies[study] = { 

762 "execute": True, 

763 "user_params": {}, 

764 "user_paths": {}, 

765 "clean_outputs": {}, 

766 } 

767 

768 for param in self.user_params: 

769 if param not in self.dict_studies[study]["user_params"]: 

770 if study == "Default": 

771 self.dict_studies[study]["user_params"][param] = False 

772 else: 

773 self.dict_studies[study]["user_params"][param] = None 

774 

775 for file in self.user_paths: 

776 if file not in self.dict_studies[study]["user_paths"]: 

777 if study == "Default": 

778 self.dict_studies[study]["user_paths"][file] = False 

779 else: 

780 self.dict_studies[study]["user_paths"][file] = None 

781 

782 for path in self.output_paths: 

783 if path not in self.dict_studies[study]["clean_outputs"]: 

784 self.dict_studies[study]["clean_outputs"][path] = False 

785 

786 # Reordering 

787 self.dict_studies[study]["user_params"] = {k: self.dict_studies[study]["user_params"][k] for k in self.user_params} 

788 self.dict_studies[study]["user_paths"] = {k: self.dict_studies[study]["user_paths"][k] for k in self.user_paths} 

789 

790 # Write studies json file 

791 with open("studies.json", "w") as f: 

792 json.dump(self.dict_studies, f, indent=4) 

793 

794 def test_studies_modification(self): 

795 """Test if studies configurations have been modified""" 

796 

797 # Loop over studies 

798 for study in self.studies: 

799 

800 self.studies_modif[study] = False 

801 

802 study_file = Path(study) / ".study.json" 

803 if study_file.exists(): 

804 with open(study_file) as f: 

805 dict_study = json.load(f) 

806 if (self.dict_studies[study]["user_params"] != dict_study["user_params"]) or \ 

807 (self.dict_studies[study]["user_paths"] != dict_study["user_paths"]): 

808 self.studies_modif[study] = True 

809 

810 def test_studies_settings(self): 

811 """Check if studies has been properly configured""" 

812 

813 # Loop over studies 

814 for study in self.studies: 

815 

816 self.studies_messages[study] = [] 

817 self.studies_config[study] = True 

818 

819 for param in self.user_params: 

820 if self.dict_studies[study]["user_params"][param] is None: 

821 self.studies_messages[study].append(f"(X) {param} not configured.") 

822 self.studies_config[study] = False 

823 else: 

824 if self.dict_studies[study]["user_params"][param]: text = "variable" 

825 else: text = "fixed" 

826 self.studies_messages[study].append(f"(V) {param} is {text}.") 

827 

828 for file in self.user_paths: 

829 if self.dict_studies[study]["user_paths"][file] is None: 

830 self.studies_messages[study].append(f"(X) {file} not configured.") 

831 self.studies_config[study] = False 

832 else: 

833 if self.dict_studies[study]["user_paths"][file]: text = "variable" 

834 else: text = "fixed" 

835 self.studies_messages[study].append(f"(V) {file} is {text}.") 

836 

837 def print_studies(self): 

838 """Print studies""" 

839 

840 for study in self.studies: 

841 

842 # Printing 

843 print() 

844 print( 

845 colored(f"| {study} |", "magenta"), 

846 ) 

847 if self.studies_modif[study]: 

848 print( 

849 colored(f"(!) Configuration has been modified.", "yellow"), 

850 ) 

851 self.clean_output_tree(study) 

852 

853 # Delete analysis file 

854 path = Path(study) / "analysis.json" 

855 if path.exists(): path.unlink() 

856 

857 for message in self.studies_messages[study]: 

858 if "(V)" in message: print(colored(message, "green")) 

859 elif "(X)" in message: print(colored(message, "red")) 

860 

861 if not self.studies_config[study]: 

862 print() 

863 print(colored(f"(X) Please configure file :", "red")) 

864 print(colored(f"> {str(Path.cwd() / "studies.json")}", "red")) 

865 sys.exit(1) 

866 

867 def init_process_settings(self): 

868 """Initialize process settings""" 

869 

870 # Loop over studies 

871 for study in self.studies: 

872 

873 # Open process json file if existing 

874 process_file = Path(study) / "process.json" 

875 if os.path.exists(process_file): 

876 with open(process_file) as f: 

877 self.dict_process[study] = json.load(f) 

878 else: 

879 self.dict_process[study] = {} 

880 

881 # Clean processes 

882 for process in list(self.dict_process[study].keys()): 

883 if process not in self.list_processes: 

884 del self.dict_process[study][process] 

885 

886 # Loop over processes 

887 for process in self.list_processes: 

888 if process not in self.dict_process[study]: 

889 self.dict_process[study][process] = { 

890 "execute": True, 

891 "silent": self.silent, 

892 } 

893 

894 # Reordering 

895 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes} 

896 

897 # Write studies json file 

898 with open(process_file, "w") as f: 

899 json.dump(self.dict_process[study], f, indent=4) 

900 

901 def configure_inputs(self): 

902 """Configure inputs with lists of fixed/variable parameters/paths""" 

903 

904 for study in self.studies: 

905 

906 # Define list of fixed/variable parameters 

907 fixed_params = [] 

908 variable_params = [] 

909 for key, value in self.dict_studies[study]["user_params"].items(): 

910 if value is True: variable_params.append(key) 

911 else: fixed_params.append(key) 

912 

913 # Define list of fixed/variable paths 

914 fixed_paths = [] 

915 variable_paths = [] 

916 for key, value in self.dict_studies[study]["user_paths"].items(): 

917 if value is True: variable_paths.append(key) 

918 else: fixed_paths.append(key) 

919 

920 self.fixed_params[study] = fixed_params 

921 self.variable_params[study] = variable_params 

922 self.fixed_paths[study] = fixed_paths 

923 self.variable_paths[study] = variable_paths 

924 

925 def init_data_tree(self): 

926 """Initialize data tree""" 

927 

928 # Loop over studies 

929 for study in self.studies: 

930 

931 # Initialize study directory 

932 study_dir:Path = self.working_dir / study 

933 study_dir.mkdir( 

934 exist_ok=True, 

935 parents=True, 

936 ) 

937 

938 # Write study json file 

939 with open(study_dir / ".study.json", "w") as f: 

940 json.dump(self.dict_studies[study], f, indent=4) 

941 

942 # Initialize inputs csv 

943 inputs_file:Path = study_dir / "inputs.csv" 

944 if (len(self.variable_params[study]) > 0) or \ 

945 (len(self.variable_paths[study]) > 0): 

946 

947 if not inputs_file.exists(): 

948 

949 # Create empty input dataframe 

950 df_inputs = pd.DataFrame(columns=["ID"]+self.variable_params[study]+["EXECUTE"]) 

951 

952 # Write input dataframe 

953 df_inputs.to_csv( 

954 path_or_buf=inputs_file, 

955 index=False, 

956 ) 

957 

958 else: 

959 

960 # Read input dataframe 

961 df_inputs = pd.read_csv( 

962 filepath_or_buffer=inputs_file, 

963 index_col=0, 

964 ) 

965 

966 # Update variable parameters 

967 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns}) 

968 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]] 

969 

970 # Set default execution 

971 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int) 

972 

973 # Write input dataframe 

974 df_inputs.to_csv( 

975 path_or_buf=inputs_file, 

976 ) 

977 

978 # Define list of datasets 

979 self.dict_datasets[study] = df_inputs.index.tolist() 

980 

981 else: 

982 # Delete file 

983 if inputs_file.exists(): inputs_file.unlink() 

984 

985 # Initialize inputs json file 

986 inputs_file:Path = study_dir / "inputs.json" 

987 if (len(self.fixed_params[study]) > 0) or \ 

988 (len(self.fixed_paths[study]) > 0) or \ 

989 (len(self.variable_paths[study]) > 0) : 

990 

991 # Create file 

992 if not inputs_file.exists(): 

993 

994 # Initialize dictionary 

995 dict_inputs = {} 

996 if len(self.fixed_params[study]) > 0: 

997 for param in self.fixed_params[study]: 

998 dict_inputs[param] = None 

999 if len(self.fixed_paths[study]) > 0: 

1000 for path in self.fixed_paths[study]: 

1001 dict_inputs[path] = None 

1002 if len(self.variable_paths[study]) > 0: 

1003 for path in self.variable_paths[study]: 

1004 dict_inputs[path] = {} 

1005 for index in df_inputs.index: 

1006 dict_inputs[path][index] = None 

1007 

1008 # Write json 

1009 with open(inputs_file, "w") as f: 

1010 json.dump(dict_inputs, f, indent=4) 

1011 

1012 # Update file 

1013 else: 

1014 

1015 # Read inputs json 

1016 with open(inputs_file) as f: 

1017 dict_inputs = json.load(f) 

1018 

1019 # Update fixed parameters 

1020 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]} 

1021 

1022 # Update fixed paths 

1023 dict_fixed_paths = {} 

1024 for path in self.fixed_paths[study]: 

1025 value = dict_inputs.get(path, None) 

1026 if isinstance(value, dict): 

1027 dict_fixed_paths[path] = None 

1028 else: 

1029 dict_fixed_paths[path] = value 

1030 

1031 # Update variable paths 

1032 dict_variable_paths = {} 

1033 for path in self.variable_paths[study]: 

1034 existing_values = dict_inputs.get(path, {}) 

1035 if not isinstance(existing_values, dict): 

1036 existing_values = {} 

1037 dict_variable_paths[path] = { 

1038 idx: existing_values.get(idx, None) 

1039 for idx in df_inputs.index 

1040 } 

1041 

1042 # Update inputs dictionnary 

1043 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths} 

1044 

1045 # Write inputs json 

1046 with open(inputs_file, "w") as f: 

1047 json.dump(dict_inputs, f, indent=4) 

1048 

1049 self.dict_inputs[study] = dict_inputs 

1050 

1051 else: 

1052 

1053 # Delete file 

1054 if inputs_file.exists(): inputs_file.unlink() 

1055 

1056 self.dict_inputs[study] = {} 

1057 

1058 # Initialize inputs directory 

1059 inputs_dir:Path = study_dir / "0_inputs" 

1060 if len(self.user_paths) > 0: 

1061 

1062 # Create inputs directory (if necessary) 

1063 inputs_dir.mkdir( 

1064 exist_ok=True, 

1065 parents=True, 

1066 ) 

1067 

1068 # Delete fixed paths (if necessary) 

1069 input_paths = [f for f in inputs_dir.iterdir()] 

1070 for path in input_paths: 

1071 resolved_path = path.resolve().name 

1072 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"): 

1073 if Path(path).is_file(): path.unlink() 

1074 else: shutil.rmtree(path) 

1075 

1076 # Update inputs subfolders for variable paths 

1077 datasets_dir:Path = inputs_dir / "0_datasets" 

1078 if len(self.variable_paths[study]) > 0: 

1079 

1080 # Create datasets directory (if necessary) 

1081 datasets_dir.mkdir( 

1082 exist_ok=True, 

1083 parents=True, 

1084 ) 

1085 

1086 # Create subfolders (if necessary) 

1087 for index in df_inputs.index: 

1088 

1089 inputs_subfolder:Path = datasets_dir / index 

1090 inputs_subfolder.mkdir( 

1091 exist_ok=True, 

1092 parents=True, 

1093 ) 

1094 

1095 # Delete variable paths (if necessary) 

1096 input_paths = [f for f in inputs_subfolder.iterdir()] 

1097 for path in input_paths: 

1098 resolved_path = path.resolve().name 

1099 if resolved_path not in self.variable_paths[study]: 

1100 if Path(path).is_file(): path.unlink() 

1101 else: shutil.rmtree(path) 

1102 

1103 # Delete subfolders (if necessary) 

1104 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()] 

1105 for folder in inputs_subfolders: 

1106 id = os.path.split(folder)[-1] 

1107 if id not in self.dict_datasets[study]: 

1108 shutil.rmtree(folder) 

1109 

1110 else: 

1111 

1112 # Delete datasets folder (if necessary) 

1113 if datasets_dir.exists(): shutil.rmtree(datasets_dir) 

1114 

1115 else: 

1116 # Delete inputs directory (if necessary) 

1117 if inputs_dir.exists(): shutil.rmtree(inputs_dir) 

1118 

1119 def clean_output_tree(self, 

1120 study: str, 

1121 ): 

1122 """Clean output data for a specific study""" 

1123 

1124 # Initialize study directory 

1125 study_dir:Path = self.working_dir / study 

1126 

1127 # Outputs data 

1128 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()] 

1129 for folder in outputs_folders: 

1130 if os.path.split(folder)[-1] != "0_inputs": 

1131 shutil.rmtree(folder) 

1132 

1133 # Paths file 

1134 paths_file = study_dir / ".paths.json" 

1135 if paths_file.exists(): paths_file.unlink() 

1136 

1137 def set_inputs(self): 

1138 """Set all inputs""" 

1139 

1140 # Loop over studies 

1141 for study in self.studies: 

1142 

1143 # Define study directory 

1144 study_dir:Path = self.working_dir / study 

1145 

1146 # Go to study directory 

1147 os.chdir(study_dir) 

1148 

1149 # Initialize dictionary of input paths 

1150 self.dict_user_paths[study] = {} 

1151 

1152 # Fixed parameters 

1153 if len(self.fixed_params[study]) > 0: 

1154 data = self.dict_inputs[study] 

1155 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data} 

1156 else: 

1157 self.dict_fixed_params[study] = {} 

1158 

1159 # Variable parameters 

1160 if (len(self.variable_params[study]) > 0) or \ 

1161 (len(self.variable_paths[study]) > 0): 

1162 

1163 # Read input dataframe 

1164 self.dict_variable_params[study] = pd.read_csv( 

1165 filepath_or_buffer="inputs.csv", 

1166 index_col=0, 

1167 ) 

1168 

1169 else: 

1170 self.dict_variable_params[study] = pd.DataFrame() 

1171 

1172 # Fixed paths 

1173 dict_input_paths = {} 

1174 for file in self.fixed_paths[study]: 

1175 if self.dict_inputs[study][file] is not None: 

1176 dict_input_paths[file] = self.dict_inputs[study][file] 

1177 else: 

1178 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file) 

1179 

1180 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1181 

1182 # Variable paths 

1183 if len(self.variable_paths[study]) > 0: 

1184 

1185 dict_input_paths = {} 

1186 df_inputs = pd.read_csv( 

1187 filepath_or_buffer="inputs.csv", 

1188 index_col=0, 

1189 ) 

1190 for file in self.variable_paths[study]: 

1191 dict_input_paths[file] = {} 

1192 for idx in df_inputs.index: 

1193 if self.dict_inputs[study][file][idx] is not None: 

1194 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx] 

1195 else: 

1196 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file) 

1197 

1198 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1199 

1200 # Go back to working directory 

1201 os.chdir(self.working_dir) 

1202 

1203 def test_inputs_settings(self): 

1204 """Test that inputs have been properly set""" 

1205 

1206 # Loop over studies 

1207 for study in self.studies: 

1208 

1209 # Define study directory 

1210 study_dir:Path = self.working_dir / study 

1211 

1212 # Go to study directory 

1213 os.chdir(study_dir) 

1214 

1215 self.fixed_params_messages[study] = [] 

1216 self.fixed_paths_messages[study] = [] 

1217 self.fixed_params_config[study] = True 

1218 self.fixed_paths_config[study] = True 

1219 self.variable_params_messages[study] = {} 

1220 self.variable_paths_messages[study] = {} 

1221 self.variable_params_config[study] = {} 

1222 self.variable_paths_config[study] = {} 

1223 

1224 # Fixed parameters 

1225 for param, value in self.dict_fixed_params[study].items(): 

1226 if value is None: 

1227 self.fixed_params_messages[study].append(f"(X) {param}") 

1228 self.fixed_params_config[study] = False 

1229 else: 

1230 if not isinstance(value, self.params_type[param][0]): 

1231 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1232 else: 

1233 self.fixed_params_messages[study].append(f"(V) {param}") 

1234 

1235 # Fixed paths 

1236 for file in self.fixed_paths[study]: 

1237 file_path:Path = Path(self.dict_user_paths[study][file]) 

1238 if not file_path.exists(): 

1239 self.fixed_paths_messages[study].append(f"(X) {file}") 

1240 self.fixed_paths_config[study] = False 

1241 else: 

1242 self.fixed_paths_messages[study].append(f"(V) {file}") 

1243 

1244 # Variable inputs 

1245 if (len(self.variable_params[study]) > 0) or \ 

1246 (len(self.variable_paths[study]) > 0): 

1247 

1248 for index in self.dict_variable_params[study].index: 

1249 

1250 self.variable_params_messages[study][index] = [] 

1251 self.variable_paths_messages[study][index] = [] 

1252 self.variable_params_config[study][index] = True 

1253 self.variable_paths_config[study][index] = True 

1254 

1255 # Variable parameters 

1256 for param in self.variable_params[study]: 

1257 value = self.dict_variable_params[study].at[index, param] 

1258 if pd.isna(value) or value == "": 

1259 self.variable_params_messages[study][index].append(f"(X) {param}") 

1260 self.variable_params_config[study][index] = False 

1261 else: 

1262 if isinstance(value, (np.integer, np.floating, np.bool_)): 

1263 value = value.item() 

1264 if not isinstance(value, self.params_type[param][0]): 

1265 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1266 else: 

1267 self.variable_params_messages[study][index].append(f"(V) {param}") 

1268 

1269 # Variable paths 

1270 for file in self.variable_paths[study]: 

1271 file_path:Path = Path(self.dict_user_paths[study][file][index]) 

1272 if not file_path.exists(): 

1273 self.variable_paths_messages[study][index].append(f"(X) {file}") 

1274 self.variable_paths_config[study][index] = False 

1275 else: 

1276 self.variable_paths_messages[study][index].append(f"(V) {file}") 

1277 

1278 # Go back to working directory 

1279 os.chdir(self.working_dir) 

1280 

1281 def print_inputs_settings(self): 

1282 """Print inputs settings""" 

1283 

1284 print() 

1285 print( 

1286 colored("> SETTINGS <", "blue", attrs=["reverse"]), 

1287 ) 

1288 for study in self.studies: 

1289 

1290 # Define study directory 

1291 study_dir:Path = self.working_dir / study 

1292 

1293 # Go to study directory 

1294 os.chdir(study_dir) 

1295 

1296 # Printing 

1297 print() 

1298 print(colored(f"| {study} |", "magenta")) 

1299 

1300 # ------------ # 

1301 # Fixed inputs # 

1302 # ------------ # 

1303 list_text = [colored(f"> Common :", "blue")] 

1304 list_errors = [] 

1305 config = True 

1306 type_error = False 

1307 

1308 # Fixed parameters 

1309 for message in self.fixed_params_messages[study]: 

1310 if "(V)" in message: 

1311 list_text.append(colored(message, "green")) 

1312 elif "(X)" in message: 

1313 list_text.append(colored(message, "red")) 

1314 if config: 

1315 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.json")}", "red")) 

1316 config = False 

1317 elif "(!)" in message: 

1318 list_text.append(colored(message, "yellow")) 

1319 type_error = True 

1320 

1321 # Fixed paths 

1322 for i, message in enumerate(self.fixed_paths_messages[study]): 

1323 if "(V)" in message: 

1324 list_text.append(colored(message, "green")) 

1325 elif "(X)" in message: 

1326 file = self.fixed_paths[study][i] 

1327 path = self.dict_user_paths[study][file] 

1328 list_text.append(colored(message, "red")) 

1329 list_errors.append(colored(f"> {path}", "red")) 

1330 

1331 # Printing 

1332 if len(list_text) == 1: 

1333 print(colored(f"None.", "blue")) 

1334 else: 

1335 print(*list_text) 

1336 

1337 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]: 

1338 print() 

1339 print(colored(f"(X) Please set inputs :", "red")) 

1340 for error in list_errors: 

1341 print(error) 

1342 sys.exit(1) 

1343 

1344 if type_error: 

1345 print() 

1346 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1347 print(colored(f"> {str(Path.cwd() / "inputs.json")}", "red")) 

1348 sys.exit(1) 

1349 

1350 # --------------- # 

1351 # Variable inputs # 

1352 # --------------- # 

1353 list_errors = [] 

1354 config = True 

1355 type_error = False 

1356 

1357 if (len(self.variable_params[study]) > 0) or \ 

1358 (len(self.variable_paths[study]) > 0): 

1359 

1360 # Check if datasets have been defined 

1361 if len(self.dict_variable_params[study].index) == 0: 

1362 print() 

1363 print(colored(f"(X) Please define at least one dataset in file :", "red")) 

1364 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1365 sys.exit(1) 

1366 

1367 for index in self.dict_variable_params[study].index: 

1368 

1369 list_text = [colored(f"> {index} :", "blue")] 

1370 

1371 # Variable parameters 

1372 for message in self.variable_params_messages[study][index]: 

1373 if "(V)" in message: 

1374 list_text.append(colored(message, "green")) 

1375 elif "(X)" in message: 

1376 list_text.append(colored(message, "red")) 

1377 if config: 

1378 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1379 config = False 

1380 elif "(!)" in message: 

1381 list_text.append(colored(message, "yellow")) 

1382 type_error = True 

1383 

1384 # Variable paths 

1385 for i, message in enumerate(self.variable_paths_messages[study][index]): 

1386 if "(V)" in message: 

1387 list_text.append(colored(message, "green")) 

1388 elif "(X)" in message: 

1389 file = self.variable_paths[study][i] 

1390 path = self.dict_user_paths[study][file][index] 

1391 list_text.append(colored(message, "red")) 

1392 list_errors.append(colored(f"> {path}", "red")) 

1393 

1394 # Printing 

1395 print(*list_text) 

1396 

1397 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1) 

1398 if len(list_errors) > 0: 

1399 print() 

1400 print(colored(f"(X) Please set inputs :", "red")) 

1401 for error in list_errors: 

1402 print(error) 

1403 sys.exit(1) 

1404 

1405 if type_error: 

1406 print() 

1407 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1408 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1409 sys.exit(1) 

1410 

1411 # Go back to working directory 

1412 os.chdir(self.working_dir) 

1413 

1414 def init_paths(self): 

1415 """Initialize dictionary containing all paths""" 

1416 

1417 # Loop over studies 

1418 for study in self.studies: 

1419 

1420 # Define study directory 

1421 study_dir:Path = self.working_dir / study 

1422 

1423 try: 

1424 with open(study_dir / ".paths.json") as f: 

1425 dict_paths = json.load(f) 

1426 except: 

1427 dict_paths = {} 

1428 for path in self.output_paths: 

1429 dict_paths[path] = None 

1430 

1431 # Purge old datasets 

1432 for key, value in dict_paths.items(): 

1433 if isinstance(value, dict): 

1434 # List of datasets to delete 

1435 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]] 

1436 for dataset in to_delete: 

1437 del dict_paths[key][dataset] 

1438 

1439 self.dict_paths[study] = dict_paths 

1440 

1441 def update_analysis(self): 

1442 

1443 # Loop over studies 

1444 for study in self.studies: 

1445 

1446 # Define study directory 

1447 study_dir:Path = self.working_dir / study 

1448 

1449 # Define analysis file 

1450 analysis_file = study_dir / "analysis.json" 

1451 

1452 # Initialize analysis file 

1453 if os.path.exists(analysis_file): 

1454 with open(analysis_file) as f: 

1455 self.dict_analysis[study] = json.load(f) 

1456 else: 

1457 self.dict_analysis[study] = {} 

1458 

1459 # Browse all outputs 

1460 for out, value in self.dict_paths[study].items(): 

1461 

1462 if out in self.analysis_settings: 

1463 dict_out = self.analysis_settings[out] 

1464 else: 

1465 dict_out = {} 

1466 

1467 if out not in self.dict_analysis[study]: 

1468 self.dict_analysis[study][out] = {} 

1469 if isinstance(value, dict): 

1470 for case in value: 

1471 self.dict_analysis[study][out][case] = dict_out 

1472 

1473 else: 

1474 if isinstance(value, dict): 

1475 for case in value: 

1476 if case not in self.dict_analysis[study][out]: 

1477 self.dict_analysis[study][out][case] = dict_out 

1478 

1479 cases_to_delete = [] 

1480 for case in self.dict_analysis[study][out]: 

1481 if case not in value: 

1482 cases_to_delete.append(case) 

1483 

1484 for case in cases_to_delete: 

1485 if case in self.dict_analysis[study][out]: 

1486 del self.dict_analysis[study][out][case] 

1487 

1488 with open(analysis_file, "w") as f: 

1489 json.dump(self.dict_analysis[study], f, indent=4) 

1490 

1491 def clean_outputs(self): 

1492 """Clean outputs.""" 

1493 

1494 # Function to remove output path, either file or directory 

1495 def _remove_output(output: str): 

1496 output_path = Path(output) 

1497 if output_path.exists(): 

1498 if output_path.is_dir(): 

1499 shutil.rmtree(output) 

1500 else: 

1501 output_path.unlink() 

1502 

1503 # Loop over studies 

1504 for study, study_dict in self.dict_studies.items(): 

1505 

1506 # Delete specified outputs 

1507 for key, value in study_dict["clean_outputs"].items(): 

1508 if value: 

1509 if isinstance(self.dict_paths[study][key], str): 

1510 _remove_output(self.dict_paths[study][key]) 

1511 if isinstance(self.dict_paths[study][key], dict): 

1512 for _, value in self.dict_paths[study][key].items(): 

1513 _remove_output(value) 

1514 

1515 def purge_output_datasets(self, 

1516 study: str, 

1517 ): 

1518 """Purge output datasets for a specific study""" 

1519 

1520 datasets_paths = [f for f in Path.cwd().iterdir()] 

1521 for path in datasets_paths: 

1522 resolved_path = path.resolve().name 

1523 if resolved_path not in self.dict_datasets[study]: 

1524 shutil.rmtree(path) 

1525 

1526 def update_workflow_diagram(self, 

1527 process: Process, 

1528 ): 

1529 """Update workflow diagram for specific process""" 

1530 

1531 self.diagram[process.name] = { 

1532 "params": list(process.params.values()), 

1533 "allparams": process.allparams, 

1534 "paths": list(process.paths.values()), 

1535 "allpaths": process.allpaths, 

1536 "required_paths": list(process.required_paths.values()), 

1537 "output_paths": list(process.output_paths.values()), 

1538 } 

1539 

1540 def __call__(self): 

1541 """Launch workflow of processes.""" 

1542 

1543 # --------------- # 

1544 # Launch workflow # 

1545 # --------------- # 

1546 print() 

1547 print( 

1548 colored("> RUNNING <", "blue", attrs=["reverse"]), 

1549 ) 

1550 

1551 for study, dict_study in self.dict_studies.items(): 

1552 

1553 # Check if study must be executed 

1554 if not dict_study["execute"]: 

1555 

1556 # Printing 

1557 print() 

1558 print( 

1559 colored(f"| {study} |", "magenta"), 

1560 ) 

1561 print() 

1562 print(colored("(!) Study is skipped.", "yellow")) 

1563 

1564 continue 

1565 

1566 study_dir:Path = self.working_dir / study 

1567 os.chdir(study_dir) 

1568 

1569 for step, proc in enumerate(self.list_workflow): 

1570 

1571 # Update analysis 

1572 self.update_analysis() 

1573 

1574 if "hard_params" in proc: dict_hard_params = proc["hard_params"] 

1575 else: dict_hard_params = {} 

1576 if "user_params" in proc: user_params = proc["user_params"] 

1577 else: user_params = {} 

1578 if "user_paths" in proc: user_paths = proc["user_paths"] 

1579 else: user_paths = {} 

1580 if "required_paths" in proc: required_paths = proc["required_paths"] 

1581 else: required_paths = {} 

1582 if "output_paths" in proc: output_paths = proc["output_paths"] 

1583 else: output_paths = {} 

1584 if "overall_analysis" in proc: overall_analysis = proc["overall_analysis"] 

1585 else: overall_analysis = {} 

1586 

1587 # Define class object for the current process 

1588 process = proc["process"] 

1589 this_process:Process = process( 

1590 study=study, 

1591 df_user_params=self.dict_variable_params[study], 

1592 dict_user_params=self.dict_fixed_params[study], 

1593 dict_user_paths=self.dict_user_paths[study], 

1594 dict_paths=self.dict_paths[study], 

1595 params=user_params, 

1596 paths=user_paths, 

1597 dict_hard_params=dict_hard_params, 

1598 fixed_params=self.fixed_params[study], 

1599 variable_params=self.variable_params[study], 

1600 fixed_paths=self.fixed_paths[study], 

1601 variable_paths=self.variable_paths[study], 

1602 required_paths=required_paths, 

1603 output_paths=output_paths, 

1604 overall_analysis=overall_analysis, 

1605 dict_analysis=self.dict_analysis[study], 

1606 silent=self.dict_process[study][self.list_processes[step]]["silent"], 

1607 diagram=self.diagram, 

1608 ) 

1609 

1610 # Define process name 

1611 this_process.name = this_process.__class__.__name__ 

1612 

1613 # Define working folder associated to the current process 

1614 folder_name = f"{step+1}_{this_process.name}" 

1615 folder_path:Path = study_dir / folder_name 

1616 folder_path.mkdir(exist_ok=True, parents=True) 

1617 os.chdir(folder_path) 

1618 

1619 # Initialize process 

1620 this_process.initialize() 

1621 

1622 # Check if process must be executed 

1623 if not self.dict_process[study][self.list_processes[step]]["execute"]: 

1624 

1625 # Printing 

1626 print() 

1627 print( 

1628 colored(f"| {study} | {this_process.name} |", "magenta"), 

1629 ) 

1630 print() 

1631 print(colored("(!) Process is skipped.", "yellow")) 

1632 

1633 # Update workflow diagram 

1634 self.update_workflow_diagram(this_process) 

1635 

1636 continue 

1637 

1638 if this_process.is_case: 

1639 

1640 # Define sub-folders associated to each ID of the inputs dataframe 

1641 for idx in this_process.df_params.index: 

1642 

1643 # Printing 

1644 print() 

1645 print( 

1646 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"), 

1647 ) 

1648 

1649 # Check if dataset must be executed 

1650 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0: 

1651 

1652 # Printing 

1653 print() 

1654 print(colored("(!) Experiment is skipped.", "yellow")) 

1655 

1656 # Go back to working folder 

1657 os.chdir(folder_path) 

1658 

1659 # Purge old output datasets 

1660 self.purge_output_datasets(study) 

1661 

1662 # Update workflow diagram 

1663 self.update_workflow_diagram(this_process) 

1664 

1665 continue 

1666 

1667 # Update process index 

1668 this_process.index = idx 

1669 

1670 subfolder_path = study_dir / folder_name / str(idx) 

1671 subfolder_path.mkdir(exist_ok=True, parents=True) 

1672 os.chdir(subfolder_path) 

1673 

1674 # Launch process 

1675 this_process() 

1676 this_process.finalize() 

1677 

1678 # Go back to working folder 

1679 os.chdir(folder_path) 

1680 

1681 # Purge old output datasets 

1682 self.purge_output_datasets(study) 

1683 

1684 else: 

1685 

1686 # Printing 

1687 print() 

1688 print( 

1689 colored(f"| {study} | {this_process.name} |", "magenta"), 

1690 ) 

1691 

1692 # Launch process 

1693 this_process() 

1694 this_process.finalize() 

1695 

1696 # Update workflow diagram 

1697 self.update_workflow_diagram(this_process) 

1698 

1699 # Update paths dictonary 

1700 self.dict_paths[study] = this_process.dict_paths 

1701 

1702 # Write paths json file 

1703 with open(study_dir / ".paths.json", "w") as f: 

1704 json.dump(self.dict_paths[study], f, indent=4) 

1705 

1706 # Go back to study directory 

1707 os.chdir(study_dir) 

1708 

1709 # Write diagram json file 

1710 with open(".diagram.json", "w") as f: 

1711 json.dump(self.diagram, f, indent=4) 

1712 

1713 # Go back to working directory 

1714 os.chdir(self.working_dir) 

1715 

1716 # Delete unecessary outputs 

1717 self.clean_outputs()