Coverage for src \ nuremics \ core \ workflow.py: 81%

939 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 18:48 +0100

1from __future__ import annotations 

2 

3import json 

4import os 

5import pathlib 

6import shutil 

7import sys 

8from importlib.resources import files 

9from pathlib import Path 

10 

11import numpy as np 

12import pandas as pd 

13from termcolor import colored 

14 

15from .process import Process 

16from .utils import ( 

17 extract_analysis, 

18 extract_inputs_and_types, 

19 extract_outputs, 

20 get_self_method_calls, 

21 only_function_calls, 

22) 

23 

24 

25class WorkFlow: 

26 

27 def __init__( 

28 self, 

29 app_name: str, 

30 config_path: Path, 

31 workflow: list, 

32 silent: bool = False, 

33 ) -> None: 

34 

35 # -------------------- # 

36 # Initialize variables # 

37 # -------------------- # 

38 self.app_name = app_name 

39 self.config_path = config_path 

40 self.list_workflow = workflow 

41 self.list_processes = [] 

42 self.dict_inputs = {} 

43 self.dict_datasets = {} 

44 self.dict_studies = {} 

45 self.dict_process = {} 

46 self.dict_analysis = {} 

47 self.user_params = [] 

48 self.user_paths = [] 

49 self.output_paths = [] 

50 self.overall_analysis = [] 

51 self.params_type = {} 

52 self.operations_by_process = {} 

53 self.inputs_by_process = {} 

54 self.params_by_process = {} 

55 self.paths_by_process = {} 

56 self.outputs_by_process = {} 

57 self.analysis_by_process = {} 

58 self.settings_by_process = {} 

59 self.params_plug = {} 

60 self.paths_plug = {} 

61 self.outputs_plug = {} 

62 self.analysis_plug = {} 

63 self.studies_modif = {} 

64 self.studies_messages = {} 

65 self.studies_config = {} 

66 self.fixed_params_messages = {} 

67 self.fixed_params_config = {} 

68 self.fixed_paths_messages = {} 

69 self.fixed_paths_config = {} 

70 self.variable_params_messages = {} 

71 self.variable_params_config = {} 

72 self.variable_paths_messages = {} 

73 self.variable_paths_config = {} 

74 self.fixed_params = {} 

75 self.fixed_paths = {} 

76 self.variable_params = {} 

77 self.variable_paths = {} 

78 self.dict_fixed_params = {} 

79 self.dict_variable_params = {} 

80 self.dict_user_paths = {} 

81 self.dict_paths = {} 

82 self.diagram = {} 

83 self.silent = silent 

84 

85 # ------------------------------------ # 

86 # Define and create nuremics directory # 

87 # ------------------------------------ # 

88 self.config_path.mkdir( 

89 exist_ok=True, 

90 parents=True, 

91 ) 

92 

93 # -------------------- # 

94 # Create settings file # 

95 # -------------------- # 

96 settings_file = self.config_path / "settings.json" 

97 if not settings_file.exists(): 

98 dict_settings = { 

99 "default_working_dir": None, 

100 "apps": {}, 

101 } 

102 with open(settings_file, "w") as f: 

103 json.dump(dict_settings, f, indent=4) 

104 

105 # -------------------------- # 

106 # Define settings dictionary # 

107 # -------------------------- # 

108 with open(settings_file) as f: 

109 self.dict_settings = json.load(f) 

110 

111 # ------------------------------- # 

112 # Initialize application settings # 

113 # ------------------------------- # 

114 if self.app_name not in self.dict_settings["apps"]: 

115 self.dict_settings["apps"][self.app_name] = { 

116 "working_dir": None, 

117 } 

118 

119 # ----------------------------- # 

120 # Set default working directory # 

121 # ----------------------------- # 

122 if self.dict_settings["default_working_dir"] is None: 

123 for _, value in self.dict_settings["apps"].items(): 

124 if value["working_dir"] is not None: 

125 self.dict_settings["default_working_dir"] = value["working_dir"] 

126 break 

127 

128 # ------------------- # 

129 # Write settings file # 

130 # ------------------- # 

131 with open(settings_file, "w") as f: 

132 json.dump(self.dict_settings, f, indent=4) 

133 

134 # ------------------------ # 

135 # Define list of processes # 

136 # ------------------------ # 

137 for proc in self.list_workflow: 

138 self.list_processes.append(proc["process"].__name__) 

139 

140 def print_logo(self) -> None: 

141 

142 ascii_logo_path: str = files("nuremics.resources").joinpath("logo.txt") 

143 f = open(ascii_logo_path) 

144 for line in f: 

145 lines = f.readlines() 

146 print() 

147 for line in lines: 

148 print(colored(line.rstrip(), "yellow")) 

149 

150 def print_application(self) -> None: 

151 

152 # Printing 

153 print() 

154 print( 

155 colored("> APPLICATION <", "blue", attrs=["reverse"]), 

156 ) 

157 print() 

158 print( 

159 colored("| Workflow |", "magenta"), 

160 ) 

161 print( 

162 colored(f"{self.app_name}_____", "blue"), 

163 ) 

164 

165 # Define number of spaces taken by the workflow print 

166 nb_spaces_app = len(self.app_name) + 5 

167 

168 # Print diagram of processes and operations 

169 error = False 

170 for i, proc in enumerate(self.list_workflow): 

171 

172 proc_name = proc["process"].__name__ 

173 process = proc["process"] 

174 this_process: Process = process() 

175 

176 # Define number of spaces taken by the application print 

177 nb_spaces_proc = len(proc_name) + 10 

178 

179 # Get list of operations for current process 

180 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__) 

181 

182 # Test if process call contains only call to operations 

183 valid_call = only_function_calls( 

184 method=this_process.__call__, 

185 allowed_methods=self.operations_by_process[proc_name], 

186 ) 

187 

188 # Printing 

189 if valid_call: 

190 print( 

191 colored(" " * nb_spaces_app + f"|_____{proc_name}_____", "blue"), 

192 ) 

193 for op_name in self.operations_by_process[proc_name]: 

194 

195 if i < len(self.list_workflow) - 1: 

196 text = " " * nb_spaces_app + "|" + " " * nb_spaces_proc + f"|_____{op_name}" 

197 else: 

198 text = " " * (nb_spaces_app + 1) + " " * nb_spaces_proc + f"|_____{op_name}" 

199 

200 # Printing 

201 print( 

202 colored(text, "blue"), 

203 ) 

204 else: 

205 print( 

206 colored(" " * nb_spaces_app + f"|_____{proc_name}_____", "blue") + 

207 colored("(X)", "red"), 

208 ) 

209 error = True 

210 

211 if i < len(self.list_workflow) - 1: 

212 print( 

213 colored(" " * nb_spaces_app + "|", "blue"), 

214 ) 

215 

216 if error: 

217 print() 

218 print(colored("(X) Each process must only call its internal function(s):", "red")) 

219 print() 

220 print(colored(" def __call__(self):", "red")) 

221 print(colored(" super().__call__()", "red")) 

222 print() 

223 print(colored(" self.operation1()", "red")) 

224 print(colored(" self.operation2()", "red")) 

225 print(colored(" self.operation3()", "red")) 

226 print(colored(" ...", "red")) 

227 sys.exit(1) 

228 

229 def set_working_directory(self) -> None: 

230 

231 # --------------------- # 

232 # Set working directory # 

233 # --------------------- # 

234 settings_file = self.config_path / "settings.json" 

235 if self.dict_settings["apps"][self.app_name]["working_dir"] is None: 

236 print() 

237 print(colored(f'(X) Please define {self.app_name} "working_dir" in file :', "red")) 

238 print(colored(f"> {settings_file}", "red")) 

239 sys.exit(1) 

240 

241 self.working_dir = Path(self.dict_settings["apps"][self.app_name]["working_dir"]) / self.app_name 

242 

243 # ------------------- # 

244 # Write settings file # 

245 # ------------------- # 

246 with open(settings_file, "w") as f: 

247 json.dump(self.dict_settings, f, indent=4) 

248 

249 # ------------------------ # 

250 # Create working directory # 

251 # ------------------------ # 

252 self.working_dir.mkdir( 

253 exist_ok=True, 

254 parents=True, 

255 ) 

256 

257 # ----------------------- # 

258 # Go to working directory # 

259 # ----------------------- # 

260 os.chdir(self.working_dir) 

261 

262 def get_inputs(self) -> None: 

263 

264 for proc in self.list_workflow: 

265 

266 process = proc["process"] 

267 name = proc["process"].__name__ 

268 this_process: Process = process() 

269 

270 self.inputs_by_process[name] = extract_inputs_and_types(this_process) 

271 self.analysis_by_process[name] = extract_analysis(this_process) 

272 

273 if "settings" in proc: 

274 self.settings_by_process[name] = proc["settings"] 

275 

276 self.params_by_process[name] = {} 

277 self.paths_by_process[name] = [] 

278 self.params_plug[name] = {} 

279 self.paths_plug[name] = {} 

280 self.analysis_plug[name] = {} 

281 

282 for key, value_type in self.inputs_by_process[name].items(): 

283 

284 # Get the module and type name 

285 module_name = value_type.__module__ 

286 type_name = value_type.__name__ 

287 

288 if module_name == "builtins": 

289 type = type_name 

290 else: 

291 type = f"{module_name}.{type_name}" 

292 

293 if key not in self.analysis_by_process[name]: 

294 

295 if issubclass(value_type, pathlib.Path): 

296 self.paths_by_process[name].append(key) 

297 if ("user_paths" in proc) and (key in proc["user_paths"]): 

298 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"] 

299 elif ("required_paths" in proc) and (key in proc["required_paths"]): 

300 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"] 

301 else: 

302 self.paths_plug[name][key] = None 

303 

304 else: 

305 self.params_by_process[name][key] = [value_type, type] 

306 if ("user_params" in proc) and (key in proc["user_params"]): 

307 self.params_plug[name][key] = [proc["user_params"][key], "user_params"] 

308 elif ("hard_params" in proc) and (key in proc["hard_params"]): 

309 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"] 

310 else: 

311 self.params_plug[name][key] = None 

312 

313 elif ("overall_analysis" in proc) and (key in proc["overall_analysis"]): 

314 self.analysis_plug[name][key] = proc["overall_analysis"][key] 

315 else: 

316 self.analysis_plug[name][key] = None 

317 

318 def get_outputs(self) -> None: 

319 

320 for proc in self.list_workflow: 

321 

322 process = proc["process"] 

323 name = proc["process"].__name__ 

324 this_process: Process = process() 

325 

326 self.outputs_by_process[name] = extract_outputs(this_process) 

327 self.outputs_plug[name] = {} 

328 

329 for output in self.outputs_by_process[name]: 

330 if ("output_paths" in proc) and (output in proc["output_paths"]): 

331 self.outputs_plug[name][output] = proc["output_paths"][output] 

332 else: 

333 self.outputs_plug[name][output] = None 

334 

335 def init_config(self) -> None: 

336 

337 for _, process in enumerate(self.list_workflow): 

338 

339 name = process["process"].__name__ 

340 

341 # Define list of user parameters 

342 if "user_params" in process: 

343 for key, value in process["user_params"].items(): 

344 if key in self.params_by_process[name]: 

345 self.user_params.append(value) 

346 else: 

347 print() 

348 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red")) 

349 sys.exit(1) 

350 

351 # Check on hard parameters 

352 if "hard_params" in process: 

353 for key, _ in process["hard_params"].items(): 

354 if key not in self.params_by_process[name]: 

355 print() 

356 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red")) 

357 sys.exit(1) 

358 

359 # Define list of user paths 

360 if "user_paths" in process: 

361 for key, value in process["user_paths"].items(): 

362 if key in self.paths_by_process[name]: 

363 self.user_paths.append(value) 

364 else: 

365 print() 

366 print(colored(f"(X) {key} is not an input path of {name}.", "red")) 

367 sys.exit(1) 

368 

369 # Check on required paths 

370 if "required_paths" in process: 

371 for _, value in process["required_paths"].items(): 

372 if value not in self.output_paths: 

373 print() 

374 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red")) 

375 sys.exit(1) 

376 

377 # Define list of output paths 

378 if "output_paths" in process: 

379 for key, value in process["output_paths"].items(): 

380 if key in self.outputs_by_process[name]: 

381 if value in self.output_paths: 

382 print() 

383 print(colored(f'(X) {value} is defined twice in "output_paths".', "red")) 

384 sys.exit(1) 

385 else: 

386 self.output_paths.append(value) 

387 else: 

388 print() 

389 print(colored(f"(X) {key} is not an output path of {name}.", "red")) 

390 sys.exit(1) 

391 

392 # Define list of outputs for analysis 

393 if "overall_analysis" in process: 

394 for key, value in process["overall_analysis"].items(): 

395 if key in self.analysis_by_process[name]: 

396 self.overall_analysis.append(value) 

397 else: 

398 print() 

399 print(colored(f"(X) {key} is not an output analysis of {name}.", "red")) 

400 sys.exit(1) 

401 

402 if value not in self.output_paths: 

403 print() 

404 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red")) 

405 sys.exit(1) 

406 

407 # Delete duplicates 

408 self.user_params = list(dict.fromkeys(self.user_params)) 

409 self.user_paths = list(dict.fromkeys(self.user_paths)) 

410 self.overall_analysis = list(dict.fromkeys(self.overall_analysis)) 

411 

412 def print_processes(self) -> None: 

413 

414 for proc in self.list_workflow: 

415 

416 name = proc["process"].__name__ 

417 

418 # Printing 

419 print() 

420 print( 

421 colored(f"| {name} |", "magenta"), 

422 ) 

423 

424 # ---------------- # 

425 # Input parameters # 

426 # ---------------- # 

427 print( 

428 colored("> Input Parameter(s) :", "blue"), 

429 ) 

430 if len(self.params_by_process[name]) == 0: 

431 print( 

432 colored("None.", "blue"), 

433 ) 

434 else: 

435 lines_proc = [] 

436 lines_user = [] 

437 error = False 

438 for key, value in self.params_by_process[name].items(): 

439 

440 # Process 

441 text_type_proc = f"({value[1]})" 

442 text_variable_proc = key 

443 lines_proc.append((text_type_proc, text_variable_proc)) 

444 

445 # User 

446 if self.params_plug[name][key] is not None: 

447 text_variable_user = str(self.params_plug[name][key][0]) 

448 text_definition_user = f"({self.params_plug[name][key][1]})" 

449 lines_user.append((text_variable_user, text_definition_user)) 

450 else: 

451 lines_user.append(("Not defined", "(X)")) 

452 error = True 

453 

454 type_proc_width = max(len(t) for t, _ in lines_proc) + 1 

455 variable_proc_width = max(len(p) for _, p in lines_proc) + 1 

456 variable_user_width = max(len(t) for t, _ in lines_user) + 1 

457 definition_user_width = max(len(p) for _, p in lines_user) + 1 

458 

459 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user): 

460 proc_str = type_proc.ljust(type_proc_width) + var_proc.ljust(variable_proc_width) + "-----|" 

461 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width) 

462 if "(X)" in user_str: 

463 color = "red" 

464 else: 

465 color = "green" 

466 print(colored(proc_str, "blue") + colored(user_str, color)) 

467 

468 if error: 

469 print() 

470 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red")) 

471 sys.exit(1) 

472 

473 # ----------- # 

474 # Input paths # 

475 # ----------- # 

476 print( 

477 colored("> Input Path(s) :", "blue"), 

478 ) 

479 if len(self.paths_by_process[name]) == 0: 

480 print( 

481 colored("None.", "blue"), 

482 ) 

483 else: 

484 lines_proc = [] 

485 lines_user = [] 

486 error = False 

487 for path in self.paths_by_process[name]: 

488 

489 # Process 

490 lines_proc.append(path) 

491 

492 # User 

493 if self.paths_plug[name][path] is not None: 

494 text_variable_user = self.paths_plug[name][path][0] 

495 text_definition_user = f"({self.paths_plug[name][path][1]})" 

496 lines_user.append((text_variable_user, text_definition_user)) 

497 else: 

498 lines_user.append(("Not defined", "(X)")) 

499 error = True 

500 

501 proc_width = max(len(t) for t in lines_proc) + 1 

502 variable_user_width = max(len(t) for t, _ in lines_user) + 1 

503 definition_user_width = max(len(p) for _, p in lines_user) + 1 

504 

505 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

506 proc_str = proc.ljust(proc_width) + "-----|" 

507 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width) 

508 if "(X)" in user_str: 

509 color = "red" 

510 else: 

511 color = "green" 

512 print(colored(proc_str, "blue") + colored(user_str, color)) 

513 

514 if error: 

515 print() 

516 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red")) 

517 sys.exit(1) 

518 

519 # ---------------- # 

520 # Input analysis # 

521 # ---------------- # 

522 print( 

523 colored("> Input Analysis :", "blue"), 

524 ) 

525 if len(self.analysis_by_process[name]) == 0: 

526 print( 

527 colored("None.", "blue"), 

528 ) 

529 else: 

530 lines_proc = [] 

531 lines_user = [] 

532 error = False 

533 for out in self.analysis_by_process[name]: 

534 

535 # Process 

536 lines_proc.append(out) 

537 

538 # User 

539 if self.analysis_plug[name][out] is not None: 

540 text_variable_user = self.analysis_plug[name][out] 

541 text_definition_user = "(overall_analysis)" 

542 lines_user.append((text_variable_user, text_definition_user)) 

543 else: 

544 lines_user.append(("Not defined", "(X)")) 

545 error = True 

546 

547 proc_width = max(len(t) for t in lines_proc) + 1 

548 variable_user_width = max(len(t) for t, _ in lines_user) + 1 

549 definition_user_width = max(len(p) for _, p in lines_user) + 1 

550 

551 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

552 proc_str = proc.ljust(proc_width) + "-----|" 

553 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width) 

554 if "(X)" in user_str: 

555 color = "red" 

556 else: 

557 color = "green" 

558 print(colored(proc_str, "blue") + colored(user_str, color)) 

559 

560 if error: 

561 print() 

562 print(colored('(X) Please define all output analysis in "overall_analysis".', "red")) 

563 sys.exit(1) 

564 

565 # ------------ # 

566 # Output paths # 

567 # ------------ # 

568 print( 

569 colored("> Output Path(s) :", "blue"), 

570 ) 

571 if len(self.outputs_by_process[name]) == 0: 

572 print( 

573 colored("None.", "blue"), 

574 ) 

575 else: 

576 lines_proc = [] 

577 lines_user = [] 

578 error = False 

579 for path in self.outputs_by_process[name]: 

580 

581 # Process 

582 lines_proc.append(path) 

583 

584 # User 

585 if self.outputs_plug[name][path] is not None: 

586 text_variable_user = self.outputs_plug[name][path] 

587 text_definition_user = "(output_paths)" 

588 lines_user.append((text_variable_user, text_definition_user)) 

589 else: 

590 lines_user.append(("Not defined", "(X)")) 

591 error = True 

592 

593 proc_width = max(len(t) for t in lines_proc) + 1 

594 variable_user_width = max(len(t) for t, _ in lines_user) + 1 

595 definition_user_width = max(len(p) for _, p in lines_user) + 1 

596 

597 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

598 proc_str = proc.ljust(proc_width) + "-----|" 

599 user_str = "|----- " + user_var.ljust(variable_user_width) + user_def.ljust(definition_user_width) 

600 if "(X)" in user_str: 

601 color = "red" 

602 else: 

603 color = "green" 

604 print(colored(proc_str, "blue") + colored(user_str, color)) 

605 

606 if error: 

607 print() 

608 print(colored('(X) Please define all output paths in "output_paths".', "red")) 

609 sys.exit(1) 

610 

611 def set_user_params_types(self) -> None: 

612 

613 # Gather all types of parameters 

614 for proc, params in self.params_by_process.items(): 

615 for param, type in params.items(): 

616 user_param = self.params_plug[proc][param][0] 

617 if user_param in self.user_params: 

618 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]): 

619 print() 

620 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red")) 

621 print(colored('> Please consider defining a new user parameter in "user_params".', "red")) 

622 sys.exit(1) 

623 self.params_type[user_param] = type 

624 

625 def print_io(self) -> None: 

626 

627 # Printing 

628 print() 

629 print( 

630 colored("> INPUTS <", "blue", attrs=["reverse"]), 

631 ) 

632 

633 # Print input parameters 

634 print() 

635 print( 

636 colored("| User Parameters |", "magenta"), 

637 ) 

638 for param, type in self.params_type.items(): 

639 print( 

640 colored(f"> {param} ({type[1]})", "blue"), 

641 ) 

642 if len(list(self.params_type.items())) == 0: 

643 print( 

644 colored("None.", "blue"), 

645 ) 

646 

647 # Print input paths 

648 print() 

649 print( 

650 colored("| User Paths |", "magenta"), 

651 ) 

652 for path in self.user_paths: 

653 print( 

654 colored(f"> {path}", "blue"), 

655 ) 

656 if len(self.user_paths) == 0: 

657 print( 

658 colored("None.", "blue"), 

659 ) 

660 

661 # Printing 

662 print() 

663 print( 

664 colored("> OUTPUTS <", "blue", attrs=["reverse"]), 

665 ) 

666 print() 

667 for path in self.output_paths: 

668 print( 

669 colored(f"> {path}", "blue"), 

670 ) 

671 if len(self.output_paths) == 0: 

672 print( 

673 colored("None.", "blue"), 

674 ) 

675 

676 def define_studies(self) -> None: 

677 

678 # ---------------------------- # 

679 # Initialize studies json file # 

680 # ---------------------------- # 

681 self.studies_file = self.working_dir / "studies.json" 

682 if self.studies_file.exists(): 

683 with open(self.studies_file) as f: 

684 self.dict_studies = json.load(f) 

685 else: 

686 self.dict_studies["studies"] = [] 

687 self.dict_studies["config"] = {} 

688 with open(self.studies_file, "w") as f: 

689 json.dump(self.dict_studies, f, indent=4) 

690 

691 print() 

692 print( 

693 colored("> STUDIES <", "blue", attrs=["reverse"]), 

694 ) 

695 

696 if len(self.dict_studies["studies"]) == 0: 

697 print() 

698 print(colored("(X) Please declare at least one study in file :", "red")) 

699 print(colored(f"> {self.studies_file}", "red")) 

700 sys.exit(1) 

701 else: 

702 self.studies = self.dict_studies["studies"] 

703 

704 def init_studies(self) -> None: 

705 

706 # Clean studies 

707 for study in list(self.dict_studies["config"].keys()): 

708 if study not in self.studies: 

709 del self.dict_studies["config"][study] 

710 

711 # Clean input parameters 

712 for study in list(self.dict_studies["config"].keys()): 

713 for param in list(self.dict_studies["config"][study]["user_params"]): 

714 if param not in self.user_params: 

715 del self.dict_studies["config"][study]["user_params"][param] 

716 

717 # Clean input paths 

718 for study in list(self.dict_studies["config"].keys()): 

719 for path in list(self.dict_studies["config"][study]["user_paths"]): 

720 if path not in self.user_paths: 

721 del self.dict_studies["config"][study]["user_paths"][path] 

722 

723 # Clean output paths 

724 for study in list(self.dict_studies["config"].keys()): 

725 for path in list(self.dict_studies["config"][study]["clean_outputs"]): 

726 if path not in self.output_paths: 

727 del self.dict_studies["config"][study]["clean_outputs"][path] 

728 

729 # Initialize input parameters/paths 

730 for study in self.studies: 

731 

732 if study not in self.dict_studies["config"]: 

733 self.dict_studies["config"][study] = { 

734 "execute": True, 

735 "user_params": {}, 

736 "user_paths": {}, 

737 "clean_outputs": {}, 

738 } 

739 

740 for param in self.user_params: 

741 if param not in self.dict_studies["config"][study]["user_params"]: 

742 if study == "Default": 

743 self.dict_studies["config"][study]["user_params"][param] = False 

744 else: 

745 self.dict_studies["config"][study]["user_params"][param] = None 

746 

747 for file in self.user_paths: 

748 if file not in self.dict_studies["config"][study]["user_paths"]: 

749 if study == "Default": 

750 self.dict_studies["config"][study]["user_paths"][file] = False 

751 else: 

752 self.dict_studies["config"][study]["user_paths"][file] = None 

753 

754 for path in self.output_paths: 

755 if path not in self.dict_studies["config"][study]["clean_outputs"]: 

756 self.dict_studies["config"][study]["clean_outputs"][path] = False 

757 

758 # Reordering 

759 self.dict_studies["config"][study]["user_params"] = {k: self.dict_studies["config"][study]["user_params"][k] for k in self.user_params} 

760 self.dict_studies["config"][study]["user_paths"] = {k: self.dict_studies["config"][study]["user_paths"][k] for k in self.user_paths} 

761 

762 # Write studies json file 

763 with open(self.studies_file, "w") as f: 

764 json.dump(self.dict_studies, f, indent=4) 

765 

766 def test_studies_modification(self) -> None: 

767 

768 # Loop over studies 

769 for study in self.studies: 

770 

771 self.studies_modif[study] = False 

772 

773 study_file = Path(study) / ".study.json" 

774 if study_file.exists(): 

775 with open(study_file) as f: 

776 dict_study = json.load(f) 

777 if (self.dict_studies["config"][study]["user_params"] != dict_study["user_params"]) or \ 

778 (self.dict_studies["config"][study]["user_paths"] != dict_study["user_paths"]): 

779 self.studies_modif[study] = True 

780 

781 def test_studies_settings(self) -> None: 

782 

783 # Loop over studies 

784 for study in self.studies: 

785 

786 self.studies_messages[study] = [] 

787 self.studies_config[study] = True 

788 

789 for param in self.user_params: 

790 if self.dict_studies["config"][study]["user_params"][param] is None: 

791 self.studies_messages[study].append(f"(X) {param} not configured.") 

792 self.studies_config[study] = False 

793 else: 

794 if self.dict_studies["config"][study]["user_params"][param]: 

795 text = "variable" 

796 else: 

797 text = "fixed" 

798 self.studies_messages[study].append(f"(V) {param} is {text}.") 

799 

800 for file in self.user_paths: 

801 if self.dict_studies["config"][study]["user_paths"][file] is None: 

802 self.studies_messages[study].append(f"(X) {file} not configured.") 

803 self.studies_config[study] = False 

804 else: 

805 if self.dict_studies["config"][study]["user_paths"][file]: 

806 text = "variable" 

807 else: 

808 text = "fixed" 

809 self.studies_messages[study].append(f"(V) {file} is {text}.") 

810 

811 def print_studies(self) -> None: 

812 

813 for study in self.studies: 

814 

815 # Printing 

816 print() 

817 print( 

818 colored(f"| {study} |", "magenta"), 

819 ) 

820 if self.studies_modif[study]: 

821 print( 

822 colored("(!) Configuration has been modified.", "yellow"), 

823 ) 

824 self.clean_output_tree(study) 

825 

826 # Delete analysis file 

827 path = Path(study) / "analysis.json" 

828 if path.exists(): 

829 path.unlink() 

830 

831 for message in self.studies_messages[study]: 

832 if "(V)" in message: 

833 print(colored(message, "green")) 

834 elif "(X)" in message: 

835 print(colored(message, "red")) 

836 

837 if not self.studies_config[study]: 

838 print() 

839 print(colored("(X) Please configure file :", "red")) 

840 print(colored(f"> {Path.cwd() / 'studies.json'}", "red")) 

841 sys.exit(1) 

842 

843 def init_process_settings(self) -> None: 

844 

845 # Loop over studies 

846 for study in self.studies: 

847 

848 # Open process json file if existing 

849 process_file = Path(study) / "process.json" 

850 if os.path.exists(process_file): 

851 with open(process_file) as f: 

852 self.dict_process[study] = json.load(f) 

853 else: 

854 self.dict_process[study] = {} 

855 

856 # Clean processes 

857 for process in list(self.dict_process[study].keys()): 

858 if process not in self.list_processes: 

859 del self.dict_process[study][process] 

860 

861 # Loop over processes 

862 for process in self.list_processes: 

863 if process not in self.dict_process[study]: 

864 self.dict_process[study][process] = { 

865 "execute": True, 

866 "silent": self.silent, 

867 } 

868 

869 # Reordering 

870 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes} 

871 

872 # Write studies json file 

873 with open(process_file, "w") as f: 

874 json.dump(self.dict_process[study], f, indent=4) 

875 

876 def configure_inputs(self) -> None: 

877 

878 for study in self.studies: 

879 

880 # Define list of fixed/variable parameters 

881 fixed_params = [] 

882 variable_params = [] 

883 for key, value in self.dict_studies["config"][study]["user_params"].items(): 

884 if value is True: 

885 variable_params.append(key) 

886 else: 

887 fixed_params.append(key) 

888 

889 # Define list of fixed/variable paths 

890 fixed_paths = [] 

891 variable_paths = [] 

892 for key, value in self.dict_studies["config"][study]["user_paths"].items(): 

893 if value is True: 

894 variable_paths.append(key) 

895 else: 

896 fixed_paths.append(key) 

897 

898 self.fixed_params[study] = fixed_params 

899 self.variable_params[study] = variable_params 

900 self.fixed_paths[study] = fixed_paths 

901 self.variable_paths[study] = variable_paths 

902 

903 def init_data_tree(self) -> None: 

904 

905 # Loop over studies 

906 for study in self.studies: 

907 

908 # Initialize study directory 

909 study_dir: Path = self.working_dir / study 

910 study_dir.mkdir( 

911 exist_ok=True, 

912 parents=True, 

913 ) 

914 

915 # Write study json file 

916 with open(study_dir / ".study.json", "w") as f: 

917 json.dump(self.dict_studies["config"][study], f, indent=4) 

918 

919 # Initialize inputs csv 

920 inputs_file: Path = study_dir / "inputs.csv" 

921 if (len(self.variable_params[study]) > 0) or \ 

922 (len(self.variable_paths[study]) > 0): 

923 

924 if not inputs_file.exists(): 

925 

926 # Create empty input dataframe 

927 df_inputs = pd.DataFrame(columns=["ID"] + self.variable_params[study] + ["EXECUTE"]) 

928 

929 # Write input dataframe 

930 df_inputs.to_csv( 

931 path_or_buf=inputs_file, 

932 index=False, 

933 ) 

934 

935 else: 

936 

937 # Read input dataframe 

938 df_inputs = pd.read_csv( 

939 filepath_or_buffer=inputs_file, 

940 index_col=0, 

941 ) 

942 

943 # Update variable parameters 

944 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns}) 

945 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]] 

946 

947 # Set default execution 

948 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int) 

949 

950 # Write input dataframe 

951 df_inputs.to_csv( 

952 path_or_buf=inputs_file, 

953 ) 

954 

955 # Define list of datasets 

956 self.dict_datasets[study] = df_inputs.index.tolist() 

957 

958 # Delete file 

959 elif inputs_file.exists(): 

960 inputs_file.unlink() 

961 

962 # Initialize inputs json file 

963 inputs_file: Path = study_dir / "inputs.json" 

964 if (len(self.fixed_params[study]) > 0) or \ 

965 (len(self.fixed_paths[study]) > 0) or \ 

966 (len(self.variable_paths[study]) > 0): 

967 

968 # Create file 

969 if not inputs_file.exists(): 

970 

971 # Initialize dictionary 

972 dict_inputs = {} 

973 if len(self.fixed_params[study]) > 0: 

974 for param in self.fixed_params[study]: 

975 dict_inputs[param] = None 

976 if len(self.fixed_paths[study]) > 0: 

977 for path in self.fixed_paths[study]: 

978 dict_inputs[path] = None 

979 if len(self.variable_paths[study]) > 0: 

980 for path in self.variable_paths[study]: 

981 dict_inputs[path] = {} 

982 for index in df_inputs.index: 

983 dict_inputs[path][index] = None 

984 

985 # Write json 

986 with open(inputs_file, "w") as f: 

987 json.dump(dict_inputs, f, indent=4) 

988 

989 # Update file 

990 else: 

991 

992 # Read inputs json 

993 with open(inputs_file) as f: 

994 dict_inputs = json.load(f) 

995 

996 # Update fixed parameters 

997 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]} 

998 

999 # Update fixed paths 

1000 dict_fixed_paths = {} 

1001 for path in self.fixed_paths[study]: 

1002 value = dict_inputs.get(path, None) 

1003 if isinstance(value, dict): 

1004 dict_fixed_paths[path] = None 

1005 else: 

1006 dict_fixed_paths[path] = value 

1007 

1008 # Update variable paths 

1009 dict_variable_paths = {} 

1010 for path in self.variable_paths[study]: 

1011 existing_values = dict_inputs.get(path, {}) 

1012 if not isinstance(existing_values, dict): 

1013 existing_values = {} 

1014 dict_variable_paths[path] = { 

1015 idx: existing_values.get(idx) 

1016 for idx in df_inputs.index 

1017 } 

1018 

1019 # Update inputs dictionnary 

1020 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths} 

1021 

1022 # Write inputs json 

1023 with open(inputs_file, "w") as f: 

1024 json.dump(dict_inputs, f, indent=4) 

1025 

1026 self.dict_inputs[study] = dict_inputs 

1027 

1028 else: 

1029 

1030 # Delete file 

1031 if inputs_file.exists(): 

1032 inputs_file.unlink() 

1033 

1034 self.dict_inputs[study] = {} 

1035 

1036 # Initialize inputs directory 

1037 inputs_dir: Path = study_dir / "0_inputs" 

1038 if len(self.user_paths) > 0: 

1039 

1040 # Create inputs directory (if necessary) 

1041 inputs_dir.mkdir( 

1042 exist_ok=True, 

1043 parents=True, 

1044 ) 

1045 

1046 # Delete fixed paths (if necessary) 

1047 input_paths = [f for f in inputs_dir.iterdir()] 

1048 for path in input_paths: 

1049 resolved_path = path.resolve().name 

1050 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"): 

1051 if Path(path).is_file(): 

1052 path.unlink() 

1053 else: 

1054 shutil.rmtree(path) 

1055 

1056 # Update inputs subfolders for variable paths 

1057 datasets_dir: Path = inputs_dir / "0_datasets" 

1058 if len(self.variable_paths[study]) > 0: 

1059 

1060 # Create datasets directory (if necessary) 

1061 datasets_dir.mkdir( 

1062 exist_ok=True, 

1063 parents=True, 

1064 ) 

1065 

1066 # Create subfolders (if necessary) 

1067 for index in df_inputs.index: 

1068 

1069 inputs_subfolder: Path = datasets_dir / index 

1070 inputs_subfolder.mkdir( 

1071 exist_ok=True, 

1072 parents=True, 

1073 ) 

1074 

1075 # Delete variable paths (if necessary) 

1076 input_paths = [f for f in inputs_subfolder.iterdir()] 

1077 for path in input_paths: 

1078 resolved_path = path.resolve().name 

1079 if resolved_path not in self.variable_paths[study]: 

1080 if Path(path).is_file(): 

1081 path.unlink() 

1082 else: 

1083 shutil.rmtree(path) 

1084 

1085 # Delete subfolders (if necessary) 

1086 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()] 

1087 for folder in inputs_subfolders: 

1088 id = os.path.split(folder)[-1] 

1089 if id not in self.dict_datasets[study]: 

1090 shutil.rmtree(folder) 

1091 

1092 # Delete datasets folder (if necessary) 

1093 elif datasets_dir.exists(): 

1094 shutil.rmtree(datasets_dir) 

1095 

1096 # Delete inputs directory (if necessary) 

1097 elif inputs_dir.exists(): 

1098 shutil.rmtree(inputs_dir) 

1099 

1100 # Delete useless study directories 

1101 studies_folders = [f for f in self.working_dir.iterdir() if f.is_dir()] 

1102 for folder in studies_folders: 

1103 if os.path.split(folder)[-1] not in self.studies: 

1104 shutil.rmtree(folder) 

1105 

1106 def clean_output_tree(self, 

1107 study: str, 

1108 ) -> None: 

1109 

1110 # Initialize study directory 

1111 study_dir: Path = self.working_dir / study 

1112 

1113 # Outputs data 

1114 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()] 

1115 for folder in outputs_folders: 

1116 if os.path.split(folder)[-1] != "0_inputs": 

1117 shutil.rmtree(folder) 

1118 

1119 # Paths file 

1120 paths_file = study_dir / ".paths.json" 

1121 if paths_file.exists(): 

1122 paths_file.unlink() 

1123 

1124 def set_inputs(self) -> None: 

1125 

1126 # Loop over studies 

1127 for study in self.studies: 

1128 

1129 # Define study directory 

1130 study_dir: Path = self.working_dir / study 

1131 

1132 # Go to study directory 

1133 os.chdir(study_dir) 

1134 

1135 # Initialize dictionary of input paths 

1136 self.dict_user_paths[study] = {} 

1137 

1138 # Fixed parameters 

1139 if len(self.fixed_params[study]) > 0: 

1140 data = self.dict_inputs[study] 

1141 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data} 

1142 else: 

1143 self.dict_fixed_params[study] = {} 

1144 

1145 # Variable parameters 

1146 if (len(self.variable_params[study]) > 0) or \ 

1147 (len(self.variable_paths[study]) > 0): 

1148 

1149 # Read input dataframe 

1150 self.dict_variable_params[study] = pd.read_csv( 

1151 filepath_or_buffer="inputs.csv", 

1152 index_col=0, 

1153 ) 

1154 

1155 else: 

1156 self.dict_variable_params[study] = pd.DataFrame() 

1157 

1158 # Fixed paths 

1159 dict_input_paths = {} 

1160 for file in self.fixed_paths[study]: 

1161 if self.dict_inputs[study][file] is not None: 

1162 dict_input_paths[file] = self.dict_inputs[study][file] 

1163 else: 

1164 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file) 

1165 

1166 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1167 

1168 # Variable paths 

1169 if len(self.variable_paths[study]) > 0: 

1170 

1171 dict_input_paths = {} 

1172 df_inputs = pd.read_csv( 

1173 filepath_or_buffer="inputs.csv", 

1174 index_col=0, 

1175 ) 

1176 for file in self.variable_paths[study]: 

1177 dict_input_paths[file] = {} 

1178 for idx in df_inputs.index: 

1179 if self.dict_inputs[study][file][idx] is not None: 

1180 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx] 

1181 else: 

1182 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file) 

1183 

1184 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1185 

1186 # Go back to working directory 

1187 os.chdir(self.working_dir) 

1188 

1189 def test_inputs_settings(self) -> None: 

1190 

1191 # Loop over studies 

1192 for study in self.studies: 

1193 

1194 # Define study directory 

1195 study_dir: Path = self.working_dir / study 

1196 

1197 # Go to study directory 

1198 os.chdir(study_dir) 

1199 

1200 self.fixed_params_messages[study] = [] 

1201 self.fixed_paths_messages[study] = [] 

1202 self.fixed_params_config[study] = True 

1203 self.fixed_paths_config[study] = True 

1204 self.variable_params_messages[study] = {} 

1205 self.variable_paths_messages[study] = {} 

1206 self.variable_params_config[study] = {} 

1207 self.variable_paths_config[study] = {} 

1208 

1209 # Fixed parameters 

1210 for param, value in self.dict_fixed_params[study].items(): 

1211 if value is None: 

1212 self.fixed_params_messages[study].append(f"(X) {param}") 

1213 self.fixed_params_config[study] = False 

1214 elif not isinstance(value, self.params_type[param][0]): 

1215 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1216 else: 

1217 self.fixed_params_messages[study].append(f"(V) {param}") 

1218 

1219 # Fixed paths 

1220 for file in self.fixed_paths[study]: 

1221 file_path: Path = Path(self.dict_user_paths[study][file]) 

1222 if not file_path.exists(): 

1223 self.fixed_paths_messages[study].append(f"(X) {file}") 

1224 self.fixed_paths_config[study] = False 

1225 else: 

1226 self.fixed_paths_messages[study].append(f"(V) {file}") 

1227 

1228 # Variable inputs 

1229 if (len(self.variable_params[study]) > 0) or \ 

1230 (len(self.variable_paths[study]) > 0): 

1231 

1232 for index in self.dict_variable_params[study].index: 

1233 

1234 self.variable_params_messages[study][index] = [] 

1235 self.variable_paths_messages[study][index] = [] 

1236 self.variable_params_config[study][index] = True 

1237 self.variable_paths_config[study][index] = True 

1238 

1239 # Variable parameters 

1240 for param in self.variable_params[study]: 

1241 value = self.dict_variable_params[study].at[index, param] 

1242 if pd.isna(value) or value == "": 

1243 self.variable_params_messages[study][index].append(f"(X) {param}") 

1244 self.variable_params_config[study][index] = False 

1245 else: 

1246 if isinstance(value, (np.integer, np.floating, np.bool_)): 

1247 value = value.item() 

1248 if not isinstance(value, self.params_type[param][0]): 

1249 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1250 else: 

1251 self.variable_params_messages[study][index].append(f"(V) {param}") 

1252 

1253 # Variable paths 

1254 for file in self.variable_paths[study]: 

1255 file_path: Path = Path(self.dict_user_paths[study][file][index]) 

1256 if not file_path.exists(): 

1257 self.variable_paths_messages[study][index].append(f"(X) {file}") 

1258 self.variable_paths_config[study][index] = False 

1259 else: 

1260 self.variable_paths_messages[study][index].append(f"(V) {file}") 

1261 

1262 # Go back to working directory 

1263 os.chdir(self.working_dir) 

1264 

1265 def print_inputs_settings(self) -> None: 

1266 

1267 print() 

1268 print( 

1269 colored("> SETTINGS <", "blue", attrs=["reverse"]), 

1270 ) 

1271 for study in self.studies: 

1272 

1273 # Define study directory 

1274 study_dir: Path = self.working_dir / study 

1275 

1276 # Go to study directory 

1277 os.chdir(study_dir) 

1278 

1279 # Printing 

1280 print() 

1281 print(colored(f"| {study} |", "magenta")) 

1282 

1283 # ------------ # 

1284 # Fixed inputs # 

1285 # ------------ # 

1286 list_text = [colored("> Common :", "blue")] 

1287 list_errors = [] 

1288 config = True 

1289 type_error = False 

1290 

1291 # Fixed parameters 

1292 for message in self.fixed_params_messages[study]: 

1293 if "(V)" in message: 

1294 list_text.append(colored(message, "green")) 

1295 elif "(X)" in message: 

1296 list_text.append(colored(message, "red")) 

1297 if config: 

1298 list_errors.append(colored(f"> {Path.cwd() / 'inputs.json'}", "red")) 

1299 config = False 

1300 elif "(!)" in message: 

1301 list_text.append(colored(message, "yellow")) 

1302 type_error = True 

1303 

1304 # Fixed paths 

1305 for i, message in enumerate(self.fixed_paths_messages[study]): 

1306 if "(V)" in message: 

1307 list_text.append(colored(message, "green")) 

1308 elif "(X)" in message: 

1309 file = self.fixed_paths[study][i] 

1310 path = self.dict_user_paths[study][file] 

1311 list_text.append(colored(message, "red")) 

1312 list_errors.append(colored(f"> {path}", "red")) 

1313 

1314 # Printing 

1315 if len(list_text) == 1: 

1316 print(colored("None.", "blue")) 

1317 else: 

1318 print(*list_text) 

1319 

1320 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]: 

1321 print() 

1322 print(colored("(X) Please set inputs :", "red")) 

1323 for error in list_errors: 

1324 print(error) 

1325 sys.exit(1) 

1326 

1327 if type_error: 

1328 print() 

1329 print(colored("(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1330 print(colored(f"> {Path.cwd() / 'inputs.json'}", "red")) 

1331 sys.exit(1) 

1332 

1333 # --------------- # 

1334 # Variable inputs # 

1335 # --------------- # 

1336 list_errors = [] 

1337 config = True 

1338 type_error = False 

1339 

1340 if (len(self.variable_params[study]) > 0) or \ 

1341 (len(self.variable_paths[study]) > 0): 

1342 

1343 # Check if datasets have been defined 

1344 if len(self.dict_variable_params[study].index) == 0: 

1345 print() 

1346 print(colored("(X) Please declare at least one experiment in file :", "red")) 

1347 print(colored(f"> {Path.cwd() / 'inputs.csv'}", "red")) 

1348 sys.exit(1) 

1349 

1350 for index in self.dict_variable_params[study].index: 

1351 

1352 list_text = [colored(f"> {index} :", "blue")] 

1353 

1354 # Variable parameters 

1355 for message in self.variable_params_messages[study][index]: 

1356 if "(V)" in message: 

1357 list_text.append(colored(message, "green")) 

1358 elif "(X)" in message: 

1359 list_text.append(colored(message, "red")) 

1360 if config: 

1361 list_errors.append(colored(f"> {Path.cwd() / 'inputs.csv'}", "red")) 

1362 config = False 

1363 elif "(!)" in message: 

1364 list_text.append(colored(message, "yellow")) 

1365 type_error = True 

1366 

1367 # Variable paths 

1368 for i, message in enumerate(self.variable_paths_messages[study][index]): 

1369 if "(V)" in message: 

1370 list_text.append(colored(message, "green")) 

1371 elif "(X)" in message: 

1372 file = self.variable_paths[study][i] 

1373 path = self.dict_user_paths[study][file][index] 

1374 list_text.append(colored(message, "red")) 

1375 list_errors.append(colored(f"> {path}", "red")) 

1376 

1377 # Printing 

1378 print(*list_text) 

1379 

1380 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1) 

1381 if len(list_errors) > 0: 

1382 print() 

1383 print(colored("(X) Please set inputs :", "red")) 

1384 for error in list_errors: 

1385 print(error) 

1386 sys.exit(1) 

1387 

1388 if type_error: 

1389 print() 

1390 print(colored("(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1391 print(colored(f"> {Path.cwd() / 'inputs.csv'}", "red")) 

1392 sys.exit(1) 

1393 

1394 # Go back to working directory 

1395 os.chdir(self.working_dir) 

1396 

1397 def init_paths(self) -> None: 

1398 

1399 # Loop over studies 

1400 for study in self.studies: 

1401 

1402 # Define study directory 

1403 study_dir: Path = self.working_dir / study 

1404 

1405 file_output_paths = study_dir / ".paths.json" 

1406 if file_output_paths.exists(): 

1407 with open(file_output_paths) as f: 

1408 dict_paths = json.load(f) 

1409 else: 

1410 dict_paths = {} 

1411 for path in self.output_paths: 

1412 dict_paths[path] = None 

1413 

1414 # Purge old datasets 

1415 for key, value in dict_paths.items(): 

1416 if isinstance(value, dict): 

1417 # List of datasets to delete 

1418 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]] 

1419 for dataset in to_delete: 

1420 del dict_paths[key][dataset] 

1421 

1422 self.dict_paths[study] = dict_paths 

1423 

1424 def update_analysis(self) -> None: 

1425 

1426 # Loop over studies 

1427 for study in self.studies: 

1428 

1429 # Define study directory 

1430 study_dir: Path = self.working_dir / study 

1431 

1432 # Define analysis file 

1433 analysis_file = study_dir / "analysis.json" 

1434 

1435 # Initialize analysis file 

1436 if os.path.exists(analysis_file): 

1437 with open(analysis_file) as f: 

1438 self.dict_analysis[study] = json.load(f) 

1439 else: 

1440 self.dict_analysis[study] = {} 

1441 

1442 # Browse all datasets 

1443 for proc, settings in self.settings_by_process.items(): 

1444 

1445 # Initialize proc key 

1446 if proc not in self.dict_analysis[study]: 

1447 self.dict_analysis[study][proc] = {} 

1448 

1449 # Add missing datasets 

1450 for dataset in self.dict_datasets[study]: 

1451 if dataset not in self.dict_analysis[study][proc]: 

1452 self.dict_analysis[study][proc][dataset] = settings 

1453 

1454 # Delete useless datasets 

1455 datasets_to_delete = [] 

1456 for dataset in self.dict_analysis[study][proc]: 

1457 if dataset not in self.dict_datasets[study]: 

1458 datasets_to_delete.append(dataset) 

1459 

1460 for dataset in datasets_to_delete: 

1461 if dataset in self.dict_analysis[study][proc]: 

1462 del self.dict_analysis[study][proc][dataset] 

1463 

1464 with open(analysis_file, "w") as f: 

1465 json.dump(self.dict_analysis[study], f, indent=4) 

1466 

1467 def clean_outputs(self) -> None: 

1468 

1469 # Function to remove output path, either file or directory 

1470 def _remove_output(output: str) -> None: 

1471 output_path = Path(output) 

1472 if output_path.exists(): 

1473 if output_path.is_dir(): 

1474 shutil.rmtree(output) 

1475 else: 

1476 output_path.unlink() 

1477 

1478 # Loop over studies 

1479 for study, study_dict in self.dict_studies["config"].items(): 

1480 

1481 # Delete specified outputs 

1482 for key, value in study_dict["clean_outputs"].items(): 

1483 if value: 

1484 if isinstance(self.dict_paths[study][key], str): 

1485 _remove_output(self.dict_paths[study][key]) 

1486 if isinstance(self.dict_paths[study][key], dict): 

1487 for _, value in self.dict_paths[study][key].items(): 

1488 _remove_output(value) 

1489 

1490 def purge_output_datasets(self, 

1491 study: str, 

1492 ) -> None: 

1493 

1494 datasets_paths = [f for f in Path.cwd().iterdir()] 

1495 for path in datasets_paths: 

1496 resolved_path = path.resolve().name 

1497 if resolved_path not in self.dict_datasets[study]: 

1498 shutil.rmtree(path) 

1499 

1500 def update_workflow_diagram(self, 

1501 process: Process, 

1502 ) -> None: 

1503 

1504 self.diagram[process.name] = { 

1505 "params": list(process.params.values()), 

1506 "allparams": process.allparams, 

1507 "paths": list(process.paths.values()), 

1508 "allpaths": process.allpaths, 

1509 "required_paths": list(process.required_paths.values()), 

1510 "output_paths": list(process.output_paths.values()), 

1511 } 

1512 

1513 def __call__(self) -> None: 

1514 

1515 # --------------- # 

1516 # Launch workflow # 

1517 # --------------- # 

1518 print() 

1519 print( 

1520 colored("> RUNNING <", "blue", attrs=["reverse"]), 

1521 ) 

1522 

1523 for study, dict_study in self.dict_studies["config"].items(): 

1524 

1525 # Check if study must be executed 

1526 if not dict_study["execute"]: 

1527 

1528 # Printing 

1529 print() 

1530 print( 

1531 colored(f"| {study} |", "magenta"), 

1532 ) 

1533 print() 

1534 print(colored("(!) Study is skipped.", "yellow")) 

1535 

1536 continue 

1537 

1538 study_dir: Path = self.working_dir / study 

1539 os.chdir(study_dir) 

1540 

1541 for step, proc in enumerate(self.list_workflow): 

1542 

1543 self.update_analysis() 

1544 

1545 if "hard_params" in proc: 

1546 dict_hard_params = proc["hard_params"] 

1547 else: 

1548 dict_hard_params = {} 

1549 

1550 if "user_params" in proc: 

1551 user_params = proc["user_params"] 

1552 else: 

1553 user_params = {} 

1554 

1555 if "user_paths" in proc: 

1556 user_paths = proc["user_paths"] 

1557 else: 

1558 user_paths = {} 

1559 

1560 if "required_paths" in proc: 

1561 required_paths = proc["required_paths"] 

1562 else: 

1563 required_paths = {} 

1564 

1565 if "output_paths" in proc: 

1566 output_paths = proc["output_paths"] 

1567 else: 

1568 output_paths = {} 

1569 

1570 if "overall_analysis" in proc: 

1571 overall_analysis = proc["overall_analysis"] 

1572 else: 

1573 overall_analysis = {} 

1574 

1575 # Define class object for the current process 

1576 process = proc["process"] 

1577 this_process: Process = process( 

1578 study=study, 

1579 df_user_params=self.dict_variable_params[study], 

1580 dict_user_params=self.dict_fixed_params[study], 

1581 dict_user_paths=self.dict_user_paths[study], 

1582 dict_paths=self.dict_paths[study], 

1583 params=user_params, 

1584 paths=user_paths, 

1585 dict_hard_params=dict_hard_params, 

1586 fixed_params=self.fixed_params[study], 

1587 variable_params=self.variable_params[study], 

1588 fixed_paths=self.fixed_paths[study], 

1589 variable_paths=self.variable_paths[study], 

1590 required_paths=required_paths, 

1591 output_paths=output_paths, 

1592 overall_analysis=overall_analysis, 

1593 dict_analysis=self.dict_analysis[study], 

1594 silent=self.dict_process[study][self.list_processes[step]]["silent"], 

1595 diagram=self.diagram, 

1596 ) 

1597 

1598 # Define process name 

1599 this_process.name = this_process.__class__.__name__ 

1600 

1601 # Define working folder associated to the current process 

1602 folder_name = f"{step + 1}_{this_process.name}" 

1603 folder_path: Path = study_dir / folder_name 

1604 folder_path.mkdir(exist_ok=True, parents=True) 

1605 os.chdir(folder_path) 

1606 

1607 # Initialize process 

1608 this_process.initialize() 

1609 

1610 # Check if process must be executed 

1611 if not self.dict_process[study][self.list_processes[step]]["execute"]: 

1612 

1613 # Printing 

1614 print() 

1615 print( 

1616 colored(f"| {study} | {this_process.name} |", "magenta"), 

1617 ) 

1618 print() 

1619 print(colored("(!) Process is skipped.", "yellow")) 

1620 

1621 # Update workflow diagram 

1622 self.update_workflow_diagram(this_process) 

1623 

1624 continue 

1625 

1626 if this_process.is_case: 

1627 

1628 # Define sub-folders associated to each ID of the inputs dataframe 

1629 for idx in this_process.df_params.index: 

1630 

1631 # Printing 

1632 print() 

1633 print( 

1634 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"), 

1635 ) 

1636 

1637 # Check if dataset must be executed 

1638 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0: 

1639 

1640 # Printing 

1641 print() 

1642 print(colored("(!) Experiment is skipped.", "yellow")) 

1643 

1644 # Go back to working folder 

1645 os.chdir(folder_path) 

1646 

1647 # Purge old output datasets 

1648 self.purge_output_datasets(study) 

1649 

1650 # Update workflow diagram 

1651 self.update_workflow_diagram(this_process) 

1652 

1653 continue 

1654 

1655 # Update process index 

1656 this_process.index = idx 

1657 

1658 subfolder_path = study_dir / folder_name / str(idx) 

1659 subfolder_path.mkdir(exist_ok=True, parents=True) 

1660 os.chdir(subfolder_path) 

1661 

1662 # Launch process 

1663 this_process() 

1664 this_process.finalize() 

1665 

1666 # Go back to working folder 

1667 os.chdir(folder_path) 

1668 

1669 # Purge old output datasets 

1670 self.purge_output_datasets(study) 

1671 

1672 else: 

1673 

1674 # Printing 

1675 print() 

1676 print( 

1677 colored(f"| {study} | {this_process.name} |", "magenta"), 

1678 ) 

1679 

1680 # Launch process 

1681 this_process() 

1682 this_process.finalize() 

1683 

1684 # Update workflow diagram 

1685 self.update_workflow_diagram(this_process) 

1686 

1687 # Update paths dictonary 

1688 self.dict_paths[study] = this_process.dict_paths 

1689 

1690 # Write paths json file 

1691 with open(study_dir / ".paths.json", "w") as f: 

1692 json.dump(self.dict_paths[study], f, indent=4) 

1693 

1694 # Go back to study directory 

1695 os.chdir(study_dir) 

1696 

1697 # Write diagram json file 

1698 with open(".diagram.json", "w") as f: 

1699 json.dump(self.diagram, f, indent=4) 

1700 

1701 # Go back to working directory 

1702 os.chdir(self.working_dir) 

1703 

1704 # Delete unecessary outputs 

1705 self.clean_outputs()