geff.solver
This module defines the main class for an ODE solver used by GEF models.
The BaseGEFSolver is the the primary class supplied by this module. It defines an algorithm by which the equations of motion for a GEF model are solved.
This makes use of the geff.bgtypes.BGSystem module to simplify conversions between numerical and physical units.
While the BaseGEFSolveris solving the ODE's it can track one or multiple Event objects. These events correspond to certain conditions, for example, the end of inflation.
Occurrences of these events can be designed to influence the solver. For example, an 'end of inflation' Event may check if
the solver has reached the end of inflation, and terminate it if it has.
The BaseGEFSolver class can be used to configure customized solvers using the class factory GEFSolver to adapt it to a specific GEF model.
1import numpy as np 2from .bgtypes import BGSystem, t, N, a, H 3from .mbm import SpecSlice 4from scipy.integrate import solve_ivp 5from typing import Callable, Tuple, ClassVar 6from ._docs import generate_docs, docs_solver 7from .utility.general import AuxTol 8 9class BaseGEFSolver: 10 known_variables : ClassVar[dict] = {"time":[t], "dynamical":[N], "static":[a], "constant":[H], "function":[], "gauge":[]} 11 """ 12 Classifies variables used by the solver according to: 13 * 'time': the name of the time parameter of the ODE's (should be "t") 14 * 'dynamical': variables evolved by the EoM (not 'gauge') 15 * 'gauge': tower of gauge-field expectation values evolved by the EoM 16 * 'static': variables computed from 'dynamical' and 'gauge' 17 * 'constant': constant variables 18 * 'function': functions of the above variables. 19 """ 20 21 known_events : ClassVar[dict] = {} 22 """The `Event` objects which are tracked by the solver.""" 23 24 def __init__(self, init_sys : BGSystem): 25 """ 26 Pass initial data to the solver. 27 28 Parameters 29 ---------- 30 init_sys : BGSystem 31 initial data used by the solver 32 """ 33 34 self.init_vals : BGSystem = BGSystem.from_system(init_sys, copy=True) 35 """Initial data for the EoM's defined at $t_0 = 0$.""" 36 37 self.init_vals.units = False 38 39 self.settings : dict ={"atol":1e-20, "rtol":1e-6, "attempts":5, "solvermethod":"RK45", "ntrstep":10} 40 """ 41 A dictionary of internal settings used by the class: 42 - atol: absolute tolerance used by `solve_ivp` (default: 1e-20) 43 - rtol: relative tolerance used by `solve_ivp` (default: 1e-6) 44 - solvermethod: method used by `solve_ivp` (default: 'RK45') 45 - attempts: attempts made by `compute_GEF_solution` (default: 5) 46 - ntrstep: `ntr` increment used in `compute_GEF_solution` (default: 10) 47 """ 48 49 self.ntr : int = 100 50 r"""Truncation number $n_{\rm tr}$, for truncated towers of ODE's.""" 51 52 self.tend : float = 120. 53 r"""The time $t_{\rm end}$ up to which the EoMs are solved.""" 54 55 #initial conditions on initialisation 56 self.set_initial_conditions_to_default() 57 58 self._test_ode() 59 60 @classmethod 61 def toggle_event(self, event_name : str, toggle : bool): 62 """ 63 Disable or enable an `Event` for the solver. 64 65 Parameters 66 ---------- 67 event_name : str 68 the name of the target 69 toggle : bool 70 if the event should be active or inactive 71 """ 72 if event_name in [event for event in self.known_events.keys()]: 73 self.known_events[event_name].active = toggle 74 humanreadable = {True:"active", False:"inactive"} 75 print(f"The event '{event_name}' is now {humanreadable[toggle]}") 76 else: 77 print(f"Unknown event: '{event_name}'") 78 return 79 80 def compute_GEF_solution(self): 81 """ 82 An algorithm to solve the GEF equations. 83 84 The solver attempts to solve the EoMs several times using `solve_eom`. 85 If this returns a `TruncationError`, `ntr` is increased by `settings['ntrstep']`. 86 Afterwards, `solve_eom` is called again until it returns a result. 87 This is done for `settings['attempts']` times or until `ntr=200` is reached. 88 89 Returns 90 ------- 91 sol 92 a bunch object returned by `solve_ivp` containing the solution 93 94 Raises 95 ------ 96 RuntimeError 97 if no solution was obtained after the maximum number of attempts. 98 """ 99 maxntr = 200 100 maxattempts = self.settings["attempts"] 101 ntrstep = self.settings["ntrstep"] 102 103 attempt=0 104 done = False 105 sol = None 106 #Run GEF 107 while not(done) and (attempt<maxattempts): 108 attempt+=1 109 try: 110 #Try to get convergent solution 111 t0, yini, vals = self.initial_conditions() 112 sol = self.solve_eom(t0, yini, vals) 113 except KeyboardInterrupt: 114 raise KeyboardInterrupt 115 except TruncationError: 116 if self.ntr<maxntr: 117 self._increase_ntr( min(ntrstep, maxntr-self.ntr) ) 118 else: 119 print("Cannot increase ntr further.") 120 break 121 else: 122 done=True 123 124 if sol is None: 125 raise RuntimeError(f"No GEF solution after {attempt} attempts.") 126 else: 127 if sol.success: 128 print("Successful GEF solution obtained. Proceeding.\n") 129 else: 130 print("Processing unsuccessful solution.\n") 131 132 return sol 133 134 ### handle initial conditions ### 135 136 @staticmethod 137 def vals_to_yini(vals : BGSystem, ntr : int) -> np.ndarray: 138 """ 139 Create an array of initial data from a BGSystem. 140 141 Parameters 142 ---------- 143 vals : BGSystem 144 a unit system with initial data 145 ntr : int 146 truncation number (relevant for dynamical gauge-fields) 147 148 Returns 149 ------- 150 yini : NDarray 151 an array of initial data 152 """ 153 vals.units = False #ensure the system is in numerical units 154 155 #In the simple version, ntr has no meaning, as there are no gauge fields 156 yini = np.zeros((1)) 157 #get N from vals 158 yini[0] = vals.N.value 159 return yini 160 161 def set_initial_conditions_to_default(self): 162 """ 163 Configure the solver to use initial data from `init_vals` using `vals_to_yini`. 164 """ 165 def default_initial_conditions(): 166 """ 167 Compute initial data with `init_vals` using `vals_to_yini`. 168 """ 169 t0 = 0 170 vals = BGSystem.from_system(self.init_vals, True) 171 vals.units = False 172 yini = self.vals_to_yini(vals, self.ntr) 173 return t0, yini, vals 174 self.initial_conditions = staticmethod(default_initial_conditions) 175 return 176 177 178 def set_initial_conditions_to_MbM(self, sol, reinit_spec : SpecSlice): 179 r""" 180 Configure the solver to initialise data from a mode-by-mode solution. 181 182 Used for mode-by-mode self correction. 183 Gauge-bilinears, $F_{\mathcal X}^{(n>1)}$ are re-initialized using `.mbm.SpecSlice.integrate_slice`. 184 185 Parameters 186 ---------- 187 sol 188 a bunch object returned by `solve_ivp` containing the solution 189 reinit_spec : SpecSlice 190 spectrum at time of reinitialization 191 """ 192 def MbM_initial_conditions(): 193 ntr = self.ntr 194 195 t_reinit = reinit_spec["t"] 196 197 reinit_ind = np.searchsorted(sol.t,t_reinit, "left") 198 199 #Create unit system (copy to also get constants and functions): 200 temp = BGSystem.from_system(self.init_vals, copy=True) 201 202 #get number of regular dynamical variables 203 n_dynam = len(self.known_variables["dynamical"]) 204 205 #construct yini 206 yini = np.zeros(((ntr+1)*3+n_dynam)) 207 208 #parse everyting except for GEF-bilinears with n>1 to yini 209 yini[:n_dynam+3] = sol.y[:n_dynam+3,reinit_ind] 210 211 # compute En, Bn, Gn, for n>1 from Modes 212 yini[n_dynam+3:] = np.array([reinit_spec.integrate_slice(n=n, integrator="simpson") 213 for n in range(1,ntr+1)])[:,:,0].reshape(3*(ntr)) 214 215 self.parse_arr_to_sys(t_reinit, yini, temp) 216 217 return t_reinit, yini, temp 218 self.initial_conditions = staticmethod(MbM_initial_conditions) 219 return 220 221 ### Define ODE ### 222 223 @staticmethod 224 def update_vals(t : float, y : np.ndarray, vals : BGSystem): 225 """ 226 Translate an array of data at time t into a `BGSystem`. 227 228 Parameters 229 ---------- 230 t : float 231 time coordinate 232 y : NDArray: 233 array of data at time t 234 vals : BGSystem 235 the target system 236 atol : float 237 absolute tolerance parameters (if needed) 238 rtol : float 239 relative tolerance parameters (if needed) 240 """ 241 242 #parse dynamical variables back to vals: 243 vals.t.value = t 244 vals.N.value = y[0] 245 246 #compute static variables from y: 247 vals.a.value = (np.exp(y[0])) 248 249 #in case heaviside functions are needed in update_vals, atol and rtol are passed 250 return 251 252 @staticmethod 253 def timestep(t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 254 """ 255 Compute time derivatives for data at time t using a `BGSystem`. 256 257 Parameters 258 ---------- 259 t : float 260 a time coordinate 261 y : NDArray: 262 an array of data at time t 263 vals : BGSystem 264 the system used to compute the derivative 265 atol : float 266 absolute tolerance parameters (if needed) 267 rtol : float 268 relative tolerance parameters (if needed) 269 270 Returns 271 ------- 272 dydt : NDArray 273 the time derivative of y 274 """ 275 dydt = np.zeros_like(y) 276 277 dydt[0] = vals.H.value #use the constant hubble rate to evolve N 278 return dydt 279 280 def ode(self, t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 281 """ 282 subsequently call `update_vals` and `timestep` to formulate an ODE for `solve_ivp`. 283 284 Parameters 285 ---------- 286 t : float 287 a time coordinate 288 y : NDArray: 289 an array of data at time t 290 vals : BGSystem 291 the system passed to `update_vals` and `timestep` and 292 293 Returns 294 ------- 295 dydt : NDArray 296 the time derivative of y 297 """ 298 self.update_vals(t, y, vals) 299 dydt = self.timestep(t, y, vals) 300 return dydt 301 302 ### solve EoMs ### 303 304 def solve_eom(self, t0 : float, yini : np.ndarray, vals : BGSystem): 305 """ 306 Attempt to solve the GEF EoM's using `scipy.integrate.solve_ivp`. 307 308 The solver attempts to obtain a GEF solution. This solution is then checked for any `ErrorEvent` occurrences. 309 In this case, the solver marks the solution as unsuccessful and returns it for further processing. 310 If no `ErrorEvent` occurrences are found, the other `TerminalEvent` occurrences are analyzed. 311 These decide if the solution is returned, repeated, or if the solver should continue to solve. 312 If there are no active `TerminalEvent` instances, the solution is returned after reaching `settings['tend']`. 313 314 Parameters 315 ---------- 316 t0 : float 317 time of initialization 318 yini : NDArray 319 initial data at t0 320 vals : BGSystem 321 evolved alongside yini 322 323 Returns 324 ------- 325 sol 326 a bunch object returned by `solve_ivp` containing the solution 327 328 Raises 329 ------ 330 TruncationError: 331 if an internal error occurred while solving the ODE. 332 RuntimeError: 333 if no 'finish' or 'error' command is obtained from an `Event` and `settings['tend']` is also not reached. 334 """ 335 done = False 336 attempts = 0 337 sols = [] 338 339 event_dict, event_funcs = self._setup_events() 340 341 print(f"The GEFSolver aims at reaching t={self.tend} with ntr={self.ntr}.") 342 while not(done) and attempts < 10: 343 344 try: 345 tend = self.tend 346 atol = self.settings["atol"] 347 rtol = self.settings["rtol"] 348 solvermethod = self.settings["solvermethod"] 349 350 teval = np.arange(np.ceil(10*t0), np.floor(10*tend) +1)/10 #hotfix 351 352 sol = solve_ivp(self.ode, [t0,tend], yini, t_eval=teval, args=(vals,), 353 method=solvermethod, atol=atol, rtol=rtol, events=event_funcs) 354 if not(sol.success): 355 raise TruncationError 356 #find 357 except KeyboardInterrupt: 358 raise KeyboardInterrupt(f"Interrupted solver at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}.") 359 except Exception as e: 360 print(f"While solving the GEF ODE, an error occurred at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}):") 361 print(e) 362 raise TruncationError 363 364 else: 365 sols.append(sol) 366 367 event_dict_new, command, terminal_event = self._assess_event_occurrences(sol.t_events, sol.y_events, vals) 368 369 for key in event_dict_new.keys(): 370 event_dict[key].append(event_dict_new[key]) 371 372 if command in ["finish", "error"]: 373 done=True 374 elif command=="repeat": 375 print("Repeating") 376 sols.pop() 377 elif command=="proceed": 378 #print("Proceeding") 379 t0 = sol.t[-1] 380 yini = sol.y[:,-1] 381 382 sol = self._finalise_solution(sols, event_dict, terminal_event) 383 384 if attempts != 1 and not(done): 385 print(f"The run failed after {attempts} attempts.") 386 raise RuntimeError 387 388 return sol 389 390 391 def _setup_events(self): 392 event_funcs = [] 393 event_dict = {} 394 for name, event in self.known_events.items(): 395 if event.active: 396 event_funcs.append(event.event_func) 397 event_dict[name] = [] 398 399 return event_dict, event_funcs 400 401 402 def _assess_event_occurrences(self, t_events, y_events, vals): 403 404 event_dict = {} 405 406 active_events = [event for event in self.known_events.values() if event.active] 407 408 # record which events have occurred 409 occurrences = {} 410 for i, event in enumerate(active_events): 411 412 #Check if the event occured 413 occurrence = (len(t_events[i]) != 0) 414 415 #Add the event occurrence to the event dictionary: 416 if occurrence: 417 event_dict.update({event.name:t_events[i]}) 418 if len(t_events[i])>1: 419 tstr = [f"{t:.1f}" for t in t_events[i]] 420 else: 421 tstr = f"{t_events[i][0]:.1f}" 422 print(f"{event.name} at t={tstr}") 423 occurrences[event.name] = occurrence 424 425 error_events = [event for event in active_events if event.type=="error"] 426 427 428 # Treat occurrences of ErrorEvents 429 for event in error_events: 430 if occurrences[event.name]: 431 print(f"Error: {event.message}") 432 return event_dict, "error", event.name 433 434 # if not ErrorEvents occurred, treat TerminalEvents 435 terminal_events = [event for event in active_events if event.type=="terminal"] 436 commands = {"primary":[], "secondary":[]} 437 438 for event in terminal_events: 439 #Asses the events consequences based on its occurrence or non-occurrence 440 primary, secondary = event.event_consequence(vals, occurrences[event.name]) 441 442 for key, item in {"primary":(primary, event.name), "secondary":secondary}.items(): 443 commands[key].append(item) 444 445 # If no error has occurred, handle secondary commands 446 for command in commands["secondary"]: 447 for key, item in command.items(): 448 if key =="timestep": 449 setattr(self, key, item) 450 elif key=="tend": 451 if item > self.tend: 452 print(f"Increasing tend by {item-self.tend:.1f} to {item:.1f}") 453 setattr(self, key, item) 454 elif key in self.settings.keys(): 455 self.update_settings({key:item}) 456 else: 457 print(f"Unknown setting '{key}', ignoring input.") 458 459 #Check command priority (in case of multiple terminal events). finish > repeat > proceed 460 for primary_command in ["finish", "repeat", "proceed"]: 461 for item in commands["primary"]: 462 command = item[0] 463 trigger = item[1] 464 if command == primary_command: 465 return event_dict, command, trigger 466 467 #if no primarycommand was passed, return "finish" (no TerminalEvent or ErrorEvents) 468 return event_dict, "finish", None 469 470 def _finalise_solution(self, sols, event_dict, trigger_event): 471 nfevs = 0 472 y = [] 473 t = [] 474 solution = sols[-1] 475 for s in sols: 476 t.append(s.t[:-1]) 477 y.append(s.y[:,:-1]) 478 nfevs += s.nfev 479 t.append(np.array([s.t[-1]])) 480 y.append(np.array([s.y[:,-1]]).T) 481 482 t = np.concatenate(t) 483 y = np.concatenate(y, axis=1) 484 485 solution.t = t 486 solution.y = y 487 solution.nfev = nfevs 488 489 if trigger_event is None: 490 solution.success = True 491 else: 492 solution.success = (trigger_event not in [event.name for event in self.known_events.values() if event.active and event.type=="error"]) 493 solution.message = f"A terminal event occured: '{trigger_event}'" 494 495 for event_name in (event_dict.keys()): 496 try: 497 event_dict[event_name] = np.concatenate(event_dict[event_name]) 498 except ValueError: 499 event_dict[event_name] = np.array(event_dict[event_name]) 500 501 solution.events = event_dict 502 return solution 503 504 ### Utility fumctions ### 505 506 #can become an internal-only method once the GEFSolver output is changed. 507 def parse_arr_to_sys(self, t : float, y : np.ndarray, vals : BGSystem): 508 """ 509 Translate a GEF solution array to a BGSystem. 510 511 Parameters 512 ---------- 513 t : NDArray 514 array of time coordinates 515 y : NDArray 516 array of variables at time t 517 vals : BGSystem 518 the target system 519 """ 520 vals.units = False 521 self.update_vals(t, y, vals) 522 return 523 524 def update_settings(self, **new_settings): 525 """ 526 Update the `settings` of the class. 527 """ 528 unknown_settings = [] 529 530 for setting, value in new_settings.items(): 531 if setting not in self.settings.keys(): 532 unknown_settings.append(setting) 533 elif value != self.settings[setting]: 534 print(f"Changing '{setting}' from {self.settings[setting]} to {value}.") 535 self.settings[setting] = value 536 if setting in ["atol", "rtol"]: 537 setattr(AuxTol, setting, value) 538 539 if len(unknown_settings) > 0: 540 print(f"Unknown settings: {unknown_settings}") 541 return 542 543 def _increase_ntr(self, val : int): 544 self.ntr+=val 545 print(f"Increasing 'ntr' by {val} to {self.ntr}.") 546 return 547 548 def _test_ode(self): 549 try: 550 t, yini, vals = self.initial_conditions() 551 except Exception: 552 raise ODECompilationError("Error while computing initial conditions.") 553 try: 554 self.ode(t, yini, vals) 555 except Exception: 556 raise ODECompilationError("Error while trying to compute a single ODE timestep.") 557 558 559def GEFSolver(new_init : Callable, new_update_vals : Callable, new_timestep : Callable, new_variables : dict, new_events : list['Event'] = []): 560 561 class CustomGEFSolver(BaseGEFSolver): 562 vals_to_yini = staticmethod(new_init) 563 update_vals = staticmethod(new_update_vals) 564 timestep = staticmethod(new_timestep) 565 known_variables = new_variables 566 known_events = {event.name : event for event in new_events} 567 568 CustomGEFSolver.__qualname__ = "GEFSolver" 569 CustomGEFSolver.__module__ = __name__ 570 return CustomGEFSolver 571 572 573class Event: 574 def __init__(self, name : str, eventtype : str, func : Callable, terminal : bool, direction : int): 575 """ 576 Initialise the event as `active`. 577 578 Parameters 579 ---------- 580 name : str 581 sets the `name` attribute 582 eventtype : str 583 sets the `type` attribute 584 func : Callable 585 sets the `event_func` attribute 586 terminal : boolean 587 defines if the event occurrence is terminal or not 588 direction : int 589 defines the direction for which event occurrences are tracked 590 """ 591 self.name : str= name 592 """The name of the event.""" 593 594 self.type : str= eventtype 595 """The event type 'terminal', 'error', or 'observer'.""" 596 597 self.active : bool = True 598 """The events state, `False` implies the `Event` is disregarded by the solver.""" 599 600 func.terminal = terminal 601 func.direction = direction 602 603 self.event_func = func 604 return 605 606 @staticmethod 607 def event_func(t : float, y : np.ndarray, sys : BGSystem) -> float: 608 """ 609 The event tracked by `Event` 610 611 This method is overwritten by the `func` input upon class initialisation. 612 The signature and return of `func`needs to match this method 613 614 Parameters 615 ---------- 616 t : float 617 time 618 y : np.ndarray 619 the solution array 620 sys : BGSystem 621 the system which is evolved alongside y 622 623 Returns 624 ------- 625 condition : float 626 condition=0 is an event occurrence 627 """ 628 return 1. 629 630class TerminalEvent(Event): 631 """ 632 An `Event` subclass whose occurrence terminates the solver. 633 634 When the solver has terminated (due to an event or otherwise) it checks for `TerminalEvent` occurrences. 635 This calls the `event_consequence` method, which returns instructions to `BaseGEFSolver`. 636 These instructions may be different depending on an event occurrence or a non-occurrence. 637 """ 638 def __init__(self, name : str, func : Callable, direction : int, consequence : Callable): 639 """ 640 Initialise the parent class and overwrite the `event_consequence` method. 641 642 Parameters 643 ---------- 644 name : str 645 sets the `name` attribute 646 func : Callable 647 sets the `event_func` attribute 648 direction : int 649 passed as `direction` to the parent class 650 consequence : Callable 651 overwrites the `event_consequence` method 652 """ 653 super().__init__(name, "terminal", func, True, direction) 654 #ToDo: check for Consequence signature (once it is fixed) 655 self.event_consequence = staticmethod(consequence) 656 657 @staticmethod 658 def event_consequence(sys : BGSystem, occurrence : bool) -> Tuple[str, dict]: 659 """ 660 Inform the solver how to handle a (non-)occurrence of the event. 661 662 This method is overwritten by the `consequence` input upon class initialisation. 663 664 The methods returns are treated as an instruction to the solver: 665 - `primary`: this informs the solver what to do with its ODE solution: 666 - 'finish': the solver returns its solution marked as successful. 667 - 'proceed': the solver continues solving from the termination time onwards. 668 - 'repeat': the solver recomputes the solution. 669 - `secondary`: this informs the solver if any of its settings need to be changed. 670 Allowed attributes are 'timestep', 'tend', 'atol', 'rtol'. See `BaseGEFSolver` for more details. 671 672 Parameters 673 ---------- 674 sys : BGSystem 675 a system containing the solution of the solver 676 occurrence : bool 677 indicates if the event occurred during the solution or not 678 679 Returns 680 ------- 681 primary : str 682 either 'finish', 'proceed' or 'repeat' 683 secondary : dict 684 the affected settings as keys and their new value as an item 685 """ 686 if occurrence: 687 return "proceed", {} 688 else: 689 return "proceed", {} 690 691 692class ErrorEvent(Event): 693 """ 694 An `Event` subclass whose occurrence indicates undesired behavior of the solution. 695 696 When the solver terminates with an `ErrorEvent`, `BaseGEFSolver.solve_eom` returns the solution as unsuccessful. 697 """ 698 def __init__(self, name : str, func : Callable, direction : int, message : str): 699 """Initialise the parent class. 700 701 The additional parameter `message` is printed on an event occurrence 702 """ 703 super().__init__(name, "error", func, True, direction) 704 self.message = message 705 706class ObserverEvent(Event): 707 """An `Event` which does not terminate the solve, but only records its occurences.""" 708 def __init__(self, name : str, func : Callable, direction : int): 709 """Initialise the parent class.""" 710 super().__init__(name, "observer", func, False, direction) 711 712class TruncationError(Exception): 713 """ 714 Exception indicating that a GEF solution was unsuccessful. 715 """ 716 pass 717 718class ODECompilationError(Exception): 719 """ 720 Exception indicating that an error occured while testing the ODE of a model. 721 """ 722 pass 723 724#define longer method docs from docs_solver: 725generate_docs(docs_solver.DOCS)
11class BaseGEFSolver: 12 known_variables : ClassVar[dict] = {"time":[t], "dynamical":[N], "static":[a], "constant":[H], "function":[], "gauge":[]} 13 """ 14 Classifies variables used by the solver according to: 15 * 'time': the name of the time parameter of the ODE's (should be "t") 16 * 'dynamical': variables evolved by the EoM (not 'gauge') 17 * 'gauge': tower of gauge-field expectation values evolved by the EoM 18 * 'static': variables computed from 'dynamical' and 'gauge' 19 * 'constant': constant variables 20 * 'function': functions of the above variables. 21 """ 22 23 known_events : ClassVar[dict] = {} 24 """The `Event` objects which are tracked by the solver.""" 25 26 def __init__(self, init_sys : BGSystem): 27 """ 28 Pass initial data to the solver. 29 30 Parameters 31 ---------- 32 init_sys : BGSystem 33 initial data used by the solver 34 """ 35 36 self.init_vals : BGSystem = BGSystem.from_system(init_sys, copy=True) 37 """Initial data for the EoM's defined at $t_0 = 0$.""" 38 39 self.init_vals.units = False 40 41 self.settings : dict ={"atol":1e-20, "rtol":1e-6, "attempts":5, "solvermethod":"RK45", "ntrstep":10} 42 """ 43 A dictionary of internal settings used by the class: 44 - atol: absolute tolerance used by `solve_ivp` (default: 1e-20) 45 - rtol: relative tolerance used by `solve_ivp` (default: 1e-6) 46 - solvermethod: method used by `solve_ivp` (default: 'RK45') 47 - attempts: attempts made by `compute_GEF_solution` (default: 5) 48 - ntrstep: `ntr` increment used in `compute_GEF_solution` (default: 10) 49 """ 50 51 self.ntr : int = 100 52 r"""Truncation number $n_{\rm tr}$, for truncated towers of ODE's.""" 53 54 self.tend : float = 120. 55 r"""The time $t_{\rm end}$ up to which the EoMs are solved.""" 56 57 #initial conditions on initialisation 58 self.set_initial_conditions_to_default() 59 60 self._test_ode() 61 62 @classmethod 63 def toggle_event(self, event_name : str, toggle : bool): 64 """ 65 Disable or enable an `Event` for the solver. 66 67 Parameters 68 ---------- 69 event_name : str 70 the name of the target 71 toggle : bool 72 if the event should be active or inactive 73 """ 74 if event_name in [event for event in self.known_events.keys()]: 75 self.known_events[event_name].active = toggle 76 humanreadable = {True:"active", False:"inactive"} 77 print(f"The event '{event_name}' is now {humanreadable[toggle]}") 78 else: 79 print(f"Unknown event: '{event_name}'") 80 return 81 82 def compute_GEF_solution(self): 83 """ 84 An algorithm to solve the GEF equations. 85 86 The solver attempts to solve the EoMs several times using `solve_eom`. 87 If this returns a `TruncationError`, `ntr` is increased by `settings['ntrstep']`. 88 Afterwards, `solve_eom` is called again until it returns a result. 89 This is done for `settings['attempts']` times or until `ntr=200` is reached. 90 91 Returns 92 ------- 93 sol 94 a bunch object returned by `solve_ivp` containing the solution 95 96 Raises 97 ------ 98 RuntimeError 99 if no solution was obtained after the maximum number of attempts. 100 """ 101 maxntr = 200 102 maxattempts = self.settings["attempts"] 103 ntrstep = self.settings["ntrstep"] 104 105 attempt=0 106 done = False 107 sol = None 108 #Run GEF 109 while not(done) and (attempt<maxattempts): 110 attempt+=1 111 try: 112 #Try to get convergent solution 113 t0, yini, vals = self.initial_conditions() 114 sol = self.solve_eom(t0, yini, vals) 115 except KeyboardInterrupt: 116 raise KeyboardInterrupt 117 except TruncationError: 118 if self.ntr<maxntr: 119 self._increase_ntr( min(ntrstep, maxntr-self.ntr) ) 120 else: 121 print("Cannot increase ntr further.") 122 break 123 else: 124 done=True 125 126 if sol is None: 127 raise RuntimeError(f"No GEF solution after {attempt} attempts.") 128 else: 129 if sol.success: 130 print("Successful GEF solution obtained. Proceeding.\n") 131 else: 132 print("Processing unsuccessful solution.\n") 133 134 return sol 135 136 ### handle initial conditions ### 137 138 @staticmethod 139 def vals_to_yini(vals : BGSystem, ntr : int) -> np.ndarray: 140 """ 141 Create an array of initial data from a BGSystem. 142 143 Parameters 144 ---------- 145 vals : BGSystem 146 a unit system with initial data 147 ntr : int 148 truncation number (relevant for dynamical gauge-fields) 149 150 Returns 151 ------- 152 yini : NDarray 153 an array of initial data 154 """ 155 vals.units = False #ensure the system is in numerical units 156 157 #In the simple version, ntr has no meaning, as there are no gauge fields 158 yini = np.zeros((1)) 159 #get N from vals 160 yini[0] = vals.N.value 161 return yini 162 163 def set_initial_conditions_to_default(self): 164 """ 165 Configure the solver to use initial data from `init_vals` using `vals_to_yini`. 166 """ 167 def default_initial_conditions(): 168 """ 169 Compute initial data with `init_vals` using `vals_to_yini`. 170 """ 171 t0 = 0 172 vals = BGSystem.from_system(self.init_vals, True) 173 vals.units = False 174 yini = self.vals_to_yini(vals, self.ntr) 175 return t0, yini, vals 176 self.initial_conditions = staticmethod(default_initial_conditions) 177 return 178 179 180 def set_initial_conditions_to_MbM(self, sol, reinit_spec : SpecSlice): 181 r""" 182 Configure the solver to initialise data from a mode-by-mode solution. 183 184 Used for mode-by-mode self correction. 185 Gauge-bilinears, $F_{\mathcal X}^{(n>1)}$ are re-initialized using `.mbm.SpecSlice.integrate_slice`. 186 187 Parameters 188 ---------- 189 sol 190 a bunch object returned by `solve_ivp` containing the solution 191 reinit_spec : SpecSlice 192 spectrum at time of reinitialization 193 """ 194 def MbM_initial_conditions(): 195 ntr = self.ntr 196 197 t_reinit = reinit_spec["t"] 198 199 reinit_ind = np.searchsorted(sol.t,t_reinit, "left") 200 201 #Create unit system (copy to also get constants and functions): 202 temp = BGSystem.from_system(self.init_vals, copy=True) 203 204 #get number of regular dynamical variables 205 n_dynam = len(self.known_variables["dynamical"]) 206 207 #construct yini 208 yini = np.zeros(((ntr+1)*3+n_dynam)) 209 210 #parse everyting except for GEF-bilinears with n>1 to yini 211 yini[:n_dynam+3] = sol.y[:n_dynam+3,reinit_ind] 212 213 # compute En, Bn, Gn, for n>1 from Modes 214 yini[n_dynam+3:] = np.array([reinit_spec.integrate_slice(n=n, integrator="simpson") 215 for n in range(1,ntr+1)])[:,:,0].reshape(3*(ntr)) 216 217 self.parse_arr_to_sys(t_reinit, yini, temp) 218 219 return t_reinit, yini, temp 220 self.initial_conditions = staticmethod(MbM_initial_conditions) 221 return 222 223 ### Define ODE ### 224 225 @staticmethod 226 def update_vals(t : float, y : np.ndarray, vals : BGSystem): 227 """ 228 Translate an array of data at time t into a `BGSystem`. 229 230 Parameters 231 ---------- 232 t : float 233 time coordinate 234 y : NDArray: 235 array of data at time t 236 vals : BGSystem 237 the target system 238 atol : float 239 absolute tolerance parameters (if needed) 240 rtol : float 241 relative tolerance parameters (if needed) 242 """ 243 244 #parse dynamical variables back to vals: 245 vals.t.value = t 246 vals.N.value = y[0] 247 248 #compute static variables from y: 249 vals.a.value = (np.exp(y[0])) 250 251 #in case heaviside functions are needed in update_vals, atol and rtol are passed 252 return 253 254 @staticmethod 255 def timestep(t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 256 """ 257 Compute time derivatives for data at time t using a `BGSystem`. 258 259 Parameters 260 ---------- 261 t : float 262 a time coordinate 263 y : NDArray: 264 an array of data at time t 265 vals : BGSystem 266 the system used to compute the derivative 267 atol : float 268 absolute tolerance parameters (if needed) 269 rtol : float 270 relative tolerance parameters (if needed) 271 272 Returns 273 ------- 274 dydt : NDArray 275 the time derivative of y 276 """ 277 dydt = np.zeros_like(y) 278 279 dydt[0] = vals.H.value #use the constant hubble rate to evolve N 280 return dydt 281 282 def ode(self, t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 283 """ 284 subsequently call `update_vals` and `timestep` to formulate an ODE for `solve_ivp`. 285 286 Parameters 287 ---------- 288 t : float 289 a time coordinate 290 y : NDArray: 291 an array of data at time t 292 vals : BGSystem 293 the system passed to `update_vals` and `timestep` and 294 295 Returns 296 ------- 297 dydt : NDArray 298 the time derivative of y 299 """ 300 self.update_vals(t, y, vals) 301 dydt = self.timestep(t, y, vals) 302 return dydt 303 304 ### solve EoMs ### 305 306 def solve_eom(self, t0 : float, yini : np.ndarray, vals : BGSystem): 307 """ 308 Attempt to solve the GEF EoM's using `scipy.integrate.solve_ivp`. 309 310 The solver attempts to obtain a GEF solution. This solution is then checked for any `ErrorEvent` occurrences. 311 In this case, the solver marks the solution as unsuccessful and returns it for further processing. 312 If no `ErrorEvent` occurrences are found, the other `TerminalEvent` occurrences are analyzed. 313 These decide if the solution is returned, repeated, or if the solver should continue to solve. 314 If there are no active `TerminalEvent` instances, the solution is returned after reaching `settings['tend']`. 315 316 Parameters 317 ---------- 318 t0 : float 319 time of initialization 320 yini : NDArray 321 initial data at t0 322 vals : BGSystem 323 evolved alongside yini 324 325 Returns 326 ------- 327 sol 328 a bunch object returned by `solve_ivp` containing the solution 329 330 Raises 331 ------ 332 TruncationError: 333 if an internal error occurred while solving the ODE. 334 RuntimeError: 335 if no 'finish' or 'error' command is obtained from an `Event` and `settings['tend']` is also not reached. 336 """ 337 done = False 338 attempts = 0 339 sols = [] 340 341 event_dict, event_funcs = self._setup_events() 342 343 print(f"The GEFSolver aims at reaching t={self.tend} with ntr={self.ntr}.") 344 while not(done) and attempts < 10: 345 346 try: 347 tend = self.tend 348 atol = self.settings["atol"] 349 rtol = self.settings["rtol"] 350 solvermethod = self.settings["solvermethod"] 351 352 teval = np.arange(np.ceil(10*t0), np.floor(10*tend) +1)/10 #hotfix 353 354 sol = solve_ivp(self.ode, [t0,tend], yini, t_eval=teval, args=(vals,), 355 method=solvermethod, atol=atol, rtol=rtol, events=event_funcs) 356 if not(sol.success): 357 raise TruncationError 358 #find 359 except KeyboardInterrupt: 360 raise KeyboardInterrupt(f"Interrupted solver at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}.") 361 except Exception as e: 362 print(f"While solving the GEF ODE, an error occurred at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}):") 363 print(e) 364 raise TruncationError 365 366 else: 367 sols.append(sol) 368 369 event_dict_new, command, terminal_event = self._assess_event_occurrences(sol.t_events, sol.y_events, vals) 370 371 for key in event_dict_new.keys(): 372 event_dict[key].append(event_dict_new[key]) 373 374 if command in ["finish", "error"]: 375 done=True 376 elif command=="repeat": 377 print("Repeating") 378 sols.pop() 379 elif command=="proceed": 380 #print("Proceeding") 381 t0 = sol.t[-1] 382 yini = sol.y[:,-1] 383 384 sol = self._finalise_solution(sols, event_dict, terminal_event) 385 386 if attempts != 1 and not(done): 387 print(f"The run failed after {attempts} attempts.") 388 raise RuntimeError 389 390 return sol 391 392 393 def _setup_events(self): 394 event_funcs = [] 395 event_dict = {} 396 for name, event in self.known_events.items(): 397 if event.active: 398 event_funcs.append(event.event_func) 399 event_dict[name] = [] 400 401 return event_dict, event_funcs 402 403 404 def _assess_event_occurrences(self, t_events, y_events, vals): 405 406 event_dict = {} 407 408 active_events = [event for event in self.known_events.values() if event.active] 409 410 # record which events have occurred 411 occurrences = {} 412 for i, event in enumerate(active_events): 413 414 #Check if the event occured 415 occurrence = (len(t_events[i]) != 0) 416 417 #Add the event occurrence to the event dictionary: 418 if occurrence: 419 event_dict.update({event.name:t_events[i]}) 420 if len(t_events[i])>1: 421 tstr = [f"{t:.1f}" for t in t_events[i]] 422 else: 423 tstr = f"{t_events[i][0]:.1f}" 424 print(f"{event.name} at t={tstr}") 425 occurrences[event.name] = occurrence 426 427 error_events = [event for event in active_events if event.type=="error"] 428 429 430 # Treat occurrences of ErrorEvents 431 for event in error_events: 432 if occurrences[event.name]: 433 print(f"Error: {event.message}") 434 return event_dict, "error", event.name 435 436 # if not ErrorEvents occurred, treat TerminalEvents 437 terminal_events = [event for event in active_events if event.type=="terminal"] 438 commands = {"primary":[], "secondary":[]} 439 440 for event in terminal_events: 441 #Asses the events consequences based on its occurrence or non-occurrence 442 primary, secondary = event.event_consequence(vals, occurrences[event.name]) 443 444 for key, item in {"primary":(primary, event.name), "secondary":secondary}.items(): 445 commands[key].append(item) 446 447 # If no error has occurred, handle secondary commands 448 for command in commands["secondary"]: 449 for key, item in command.items(): 450 if key =="timestep": 451 setattr(self, key, item) 452 elif key=="tend": 453 if item > self.tend: 454 print(f"Increasing tend by {item-self.tend:.1f} to {item:.1f}") 455 setattr(self, key, item) 456 elif key in self.settings.keys(): 457 self.update_settings({key:item}) 458 else: 459 print(f"Unknown setting '{key}', ignoring input.") 460 461 #Check command priority (in case of multiple terminal events). finish > repeat > proceed 462 for primary_command in ["finish", "repeat", "proceed"]: 463 for item in commands["primary"]: 464 command = item[0] 465 trigger = item[1] 466 if command == primary_command: 467 return event_dict, command, trigger 468 469 #if no primarycommand was passed, return "finish" (no TerminalEvent or ErrorEvents) 470 return event_dict, "finish", None 471 472 def _finalise_solution(self, sols, event_dict, trigger_event): 473 nfevs = 0 474 y = [] 475 t = [] 476 solution = sols[-1] 477 for s in sols: 478 t.append(s.t[:-1]) 479 y.append(s.y[:,:-1]) 480 nfevs += s.nfev 481 t.append(np.array([s.t[-1]])) 482 y.append(np.array([s.y[:,-1]]).T) 483 484 t = np.concatenate(t) 485 y = np.concatenate(y, axis=1) 486 487 solution.t = t 488 solution.y = y 489 solution.nfev = nfevs 490 491 if trigger_event is None: 492 solution.success = True 493 else: 494 solution.success = (trigger_event not in [event.name for event in self.known_events.values() if event.active and event.type=="error"]) 495 solution.message = f"A terminal event occured: '{trigger_event}'" 496 497 for event_name in (event_dict.keys()): 498 try: 499 event_dict[event_name] = np.concatenate(event_dict[event_name]) 500 except ValueError: 501 event_dict[event_name] = np.array(event_dict[event_name]) 502 503 solution.events = event_dict 504 return solution 505 506 ### Utility fumctions ### 507 508 #can become an internal-only method once the GEFSolver output is changed. 509 def parse_arr_to_sys(self, t : float, y : np.ndarray, vals : BGSystem): 510 """ 511 Translate a GEF solution array to a BGSystem. 512 513 Parameters 514 ---------- 515 t : NDArray 516 array of time coordinates 517 y : NDArray 518 array of variables at time t 519 vals : BGSystem 520 the target system 521 """ 522 vals.units = False 523 self.update_vals(t, y, vals) 524 return 525 526 def update_settings(self, **new_settings): 527 """ 528 Update the `settings` of the class. 529 """ 530 unknown_settings = [] 531 532 for setting, value in new_settings.items(): 533 if setting not in self.settings.keys(): 534 unknown_settings.append(setting) 535 elif value != self.settings[setting]: 536 print(f"Changing '{setting}' from {self.settings[setting]} to {value}.") 537 self.settings[setting] = value 538 if setting in ["atol", "rtol"]: 539 setattr(AuxTol, setting, value) 540 541 if len(unknown_settings) > 0: 542 print(f"Unknown settings: {unknown_settings}") 543 return 544 545 def _increase_ntr(self, val : int): 546 self.ntr+=val 547 print(f"Increasing 'ntr' by {val} to {self.ntr}.") 548 return 549 550 def _test_ode(self): 551 try: 552 t, yini, vals = self.initial_conditions() 553 except Exception: 554 raise ODECompilationError("Error while computing initial conditions.") 555 try: 556 self.ode(t, yini, vals) 557 except Exception: 558 raise ODECompilationError("Error while trying to compute a single ODE timestep.")
A class used to solve the equations of motion defined by a GEF model.
The main purpose of the class is to provide the compute_GEF_solution method to run.
Internally, the method uses solve_eom, which wraps scipy.integrate.solve_ivp.
All specifications on the GEF model are encoded in the attributes known_variables and known_events, as well as
the methods vals_to_yini, update_vals and timestep.
For illustration, purposes, these methods are configured such that the GEFSolver solves the
EoMs for de Sitter expansion:
$$\frac{{\rm d} \log a}{{\rm d} t} = H_0$$
In practice, the class factory GEFSolver creates a subclass of BaseGEFSolver adapted to the EoMs of other models.
26 def __init__(self, init_sys : BGSystem): 27 """ 28 Pass initial data to the solver. 29 30 Parameters 31 ---------- 32 init_sys : BGSystem 33 initial data used by the solver 34 """ 35 36 self.init_vals : BGSystem = BGSystem.from_system(init_sys, copy=True) 37 """Initial data for the EoM's defined at $t_0 = 0$.""" 38 39 self.init_vals.units = False 40 41 self.settings : dict ={"atol":1e-20, "rtol":1e-6, "attempts":5, "solvermethod":"RK45", "ntrstep":10} 42 """ 43 A dictionary of internal settings used by the class: 44 - atol: absolute tolerance used by `solve_ivp` (default: 1e-20) 45 - rtol: relative tolerance used by `solve_ivp` (default: 1e-6) 46 - solvermethod: method used by `solve_ivp` (default: 'RK45') 47 - attempts: attempts made by `compute_GEF_solution` (default: 5) 48 - ntrstep: `ntr` increment used in `compute_GEF_solution` (default: 10) 49 """ 50 51 self.ntr : int = 100 52 r"""Truncation number $n_{\rm tr}$, for truncated towers of ODE's.""" 53 54 self.tend : float = 120. 55 r"""The time $t_{\rm end}$ up to which the EoMs are solved.""" 56 57 #initial conditions on initialisation 58 self.set_initial_conditions_to_default() 59 60 self._test_ode()
Pass initial data to the solver.
Parameters
- init_sys (BGSystem): initial data used by the solver
Classifies variables used by the solver according to:
- 'time': the name of the time parameter of the ODE's (should be "t")
- 'dynamical': variables evolved by the EoM (not 'gauge')
- 'gauge': tower of gauge-field expectation values evolved by the EoM
- 'static': variables computed from 'dynamical' and 'gauge'
- 'constant': constant variables
- 'function': functions of the above variables.
A dictionary of internal settings used by the class:
- atol: absolute tolerance used by
solve_ivp(default: 1e-20) - rtol: relative tolerance used by
solve_ivp(default: 1e-6) - solvermethod: method used by
solve_ivp(default: 'RK45') - attempts: attempts made by
compute_GEF_solution(default: 5) - ntrstep:
ntrincrement used incompute_GEF_solution(default: 10)
62 @classmethod 63 def toggle_event(self, event_name : str, toggle : bool): 64 """ 65 Disable or enable an `Event` for the solver. 66 67 Parameters 68 ---------- 69 event_name : str 70 the name of the target 71 toggle : bool 72 if the event should be active or inactive 73 """ 74 if event_name in [event for event in self.known_events.keys()]: 75 self.known_events[event_name].active = toggle 76 humanreadable = {True:"active", False:"inactive"} 77 print(f"The event '{event_name}' is now {humanreadable[toggle]}") 78 else: 79 print(f"Unknown event: '{event_name}'") 80 return
Disable or enable an Event for the solver.
Parameters
- event_name (str): the name of the target
- toggle (bool): if the event should be active or inactive
82 def compute_GEF_solution(self): 83 """ 84 An algorithm to solve the GEF equations. 85 86 The solver attempts to solve the EoMs several times using `solve_eom`. 87 If this returns a `TruncationError`, `ntr` is increased by `settings['ntrstep']`. 88 Afterwards, `solve_eom` is called again until it returns a result. 89 This is done for `settings['attempts']` times or until `ntr=200` is reached. 90 91 Returns 92 ------- 93 sol 94 a bunch object returned by `solve_ivp` containing the solution 95 96 Raises 97 ------ 98 RuntimeError 99 if no solution was obtained after the maximum number of attempts. 100 """ 101 maxntr = 200 102 maxattempts = self.settings["attempts"] 103 ntrstep = self.settings["ntrstep"] 104 105 attempt=0 106 done = False 107 sol = None 108 #Run GEF 109 while not(done) and (attempt<maxattempts): 110 attempt+=1 111 try: 112 #Try to get convergent solution 113 t0, yini, vals = self.initial_conditions() 114 sol = self.solve_eom(t0, yini, vals) 115 except KeyboardInterrupt: 116 raise KeyboardInterrupt 117 except TruncationError: 118 if self.ntr<maxntr: 119 self._increase_ntr( min(ntrstep, maxntr-self.ntr) ) 120 else: 121 print("Cannot increase ntr further.") 122 break 123 else: 124 done=True 125 126 if sol is None: 127 raise RuntimeError(f"No GEF solution after {attempt} attempts.") 128 else: 129 if sol.success: 130 print("Successful GEF solution obtained. Proceeding.\n") 131 else: 132 print("Processing unsuccessful solution.\n") 133 134 return sol
An algorithm to solve the GEF equations.
The solver attempts to solve the EoMs several times using solve_eom.
If this returns a TruncationError, ntr is increased by settings['ntrstep'].
Afterwards, solve_eom is called again until it returns a result.
This is done for settings['attempts'] times or until ntr=200 is reached.
Returns
- sol: a bunch object returned by
solve_ivpcontaining the solution
Raises
- RuntimeError: if no solution was obtained after the maximum number of attempts.
138 @staticmethod 139 def vals_to_yini(vals : BGSystem, ntr : int) -> np.ndarray: 140 """ 141 Create an array of initial data from a BGSystem. 142 143 Parameters 144 ---------- 145 vals : BGSystem 146 a unit system with initial data 147 ntr : int 148 truncation number (relevant for dynamical gauge-fields) 149 150 Returns 151 ------- 152 yini : NDarray 153 an array of initial data 154 """ 155 vals.units = False #ensure the system is in numerical units 156 157 #In the simple version, ntr has no meaning, as there are no gauge fields 158 yini = np.zeros((1)) 159 #get N from vals 160 yini[0] = vals.N.value 161 return yini
Create an array of initial data from a BGSystem.
Parameters
- vals (BGSystem): a unit system with initial data
- ntr (int): truncation number (relevant for dynamical gauge-fields)
Returns
- yini (NDarray): an array of initial data
163 def set_initial_conditions_to_default(self): 164 """ 165 Configure the solver to use initial data from `init_vals` using `vals_to_yini`. 166 """ 167 def default_initial_conditions(): 168 """ 169 Compute initial data with `init_vals` using `vals_to_yini`. 170 """ 171 t0 = 0 172 vals = BGSystem.from_system(self.init_vals, True) 173 vals.units = False 174 yini = self.vals_to_yini(vals, self.ntr) 175 return t0, yini, vals 176 self.initial_conditions = staticmethod(default_initial_conditions) 177 return
Configure the solver to use initial data from init_vals using vals_to_yini.
180 def set_initial_conditions_to_MbM(self, sol, reinit_spec : SpecSlice): 181 r""" 182 Configure the solver to initialise data from a mode-by-mode solution. 183 184 Used for mode-by-mode self correction. 185 Gauge-bilinears, $F_{\mathcal X}^{(n>1)}$ are re-initialized using `.mbm.SpecSlice.integrate_slice`. 186 187 Parameters 188 ---------- 189 sol 190 a bunch object returned by `solve_ivp` containing the solution 191 reinit_spec : SpecSlice 192 spectrum at time of reinitialization 193 """ 194 def MbM_initial_conditions(): 195 ntr = self.ntr 196 197 t_reinit = reinit_spec["t"] 198 199 reinit_ind = np.searchsorted(sol.t,t_reinit, "left") 200 201 #Create unit system (copy to also get constants and functions): 202 temp = BGSystem.from_system(self.init_vals, copy=True) 203 204 #get number of regular dynamical variables 205 n_dynam = len(self.known_variables["dynamical"]) 206 207 #construct yini 208 yini = np.zeros(((ntr+1)*3+n_dynam)) 209 210 #parse everyting except for GEF-bilinears with n>1 to yini 211 yini[:n_dynam+3] = sol.y[:n_dynam+3,reinit_ind] 212 213 # compute En, Bn, Gn, for n>1 from Modes 214 yini[n_dynam+3:] = np.array([reinit_spec.integrate_slice(n=n, integrator="simpson") 215 for n in range(1,ntr+1)])[:,:,0].reshape(3*(ntr)) 216 217 self.parse_arr_to_sys(t_reinit, yini, temp) 218 219 return t_reinit, yini, temp 220 self.initial_conditions = staticmethod(MbM_initial_conditions) 221 return
Configure the solver to initialise data from a mode-by-mode solution.
Used for mode-by-mode self correction.
Gauge-bilinears, $F_{\mathcal X}^{(n>1)}$ are re-initialized using .mbm.SpecSlice.integrate_slice.
Parameters
- sol: a bunch object returned by
solve_ivpcontaining the solution - reinit_spec (SpecSlice): spectrum at time of reinitialization
225 @staticmethod 226 def update_vals(t : float, y : np.ndarray, vals : BGSystem): 227 """ 228 Translate an array of data at time t into a `BGSystem`. 229 230 Parameters 231 ---------- 232 t : float 233 time coordinate 234 y : NDArray: 235 array of data at time t 236 vals : BGSystem 237 the target system 238 atol : float 239 absolute tolerance parameters (if needed) 240 rtol : float 241 relative tolerance parameters (if needed) 242 """ 243 244 #parse dynamical variables back to vals: 245 vals.t.value = t 246 vals.N.value = y[0] 247 248 #compute static variables from y: 249 vals.a.value = (np.exp(y[0])) 250 251 #in case heaviside functions are needed in update_vals, atol and rtol are passed 252 return
Translate an array of data at time t into a BGSystem.
Parameters
- t (float): time coordinate
- y (NDArray:): array of data at time t
- vals (BGSystem): the target system
- atol (float): absolute tolerance parameters (if needed)
- rtol (float): relative tolerance parameters (if needed)
254 @staticmethod 255 def timestep(t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 256 """ 257 Compute time derivatives for data at time t using a `BGSystem`. 258 259 Parameters 260 ---------- 261 t : float 262 a time coordinate 263 y : NDArray: 264 an array of data at time t 265 vals : BGSystem 266 the system used to compute the derivative 267 atol : float 268 absolute tolerance parameters (if needed) 269 rtol : float 270 relative tolerance parameters (if needed) 271 272 Returns 273 ------- 274 dydt : NDArray 275 the time derivative of y 276 """ 277 dydt = np.zeros_like(y) 278 279 dydt[0] = vals.H.value #use the constant hubble rate to evolve N 280 return dydt
Compute time derivatives for data at time t using a BGSystem.
Parameters
- t (float): a time coordinate
- y (NDArray:): an array of data at time t
- vals (BGSystem): the system used to compute the derivative
- atol (float): absolute tolerance parameters (if needed)
- rtol (float): relative tolerance parameters (if needed)
Returns
- dydt (NDArray): the time derivative of y
282 def ode(self, t : float, y : np.ndarray, vals : BGSystem) -> np.ndarray: 283 """ 284 subsequently call `update_vals` and `timestep` to formulate an ODE for `solve_ivp`. 285 286 Parameters 287 ---------- 288 t : float 289 a time coordinate 290 y : NDArray: 291 an array of data at time t 292 vals : BGSystem 293 the system passed to `update_vals` and `timestep` and 294 295 Returns 296 ------- 297 dydt : NDArray 298 the time derivative of y 299 """ 300 self.update_vals(t, y, vals) 301 dydt = self.timestep(t, y, vals) 302 return dydt
subsequently call update_vals and timestep to formulate an ODE for solve_ivp.
Parameters
- t (float): a time coordinate
- y (NDArray:): an array of data at time t
- vals (BGSystem):
the system passed to
update_valsandtimestepand
Returns
- dydt (NDArray): the time derivative of y
306 def solve_eom(self, t0 : float, yini : np.ndarray, vals : BGSystem): 307 """ 308 Attempt to solve the GEF EoM's using `scipy.integrate.solve_ivp`. 309 310 The solver attempts to obtain a GEF solution. This solution is then checked for any `ErrorEvent` occurrences. 311 In this case, the solver marks the solution as unsuccessful and returns it for further processing. 312 If no `ErrorEvent` occurrences are found, the other `TerminalEvent` occurrences are analyzed. 313 These decide if the solution is returned, repeated, or if the solver should continue to solve. 314 If there are no active `TerminalEvent` instances, the solution is returned after reaching `settings['tend']`. 315 316 Parameters 317 ---------- 318 t0 : float 319 time of initialization 320 yini : NDArray 321 initial data at t0 322 vals : BGSystem 323 evolved alongside yini 324 325 Returns 326 ------- 327 sol 328 a bunch object returned by `solve_ivp` containing the solution 329 330 Raises 331 ------ 332 TruncationError: 333 if an internal error occurred while solving the ODE. 334 RuntimeError: 335 if no 'finish' or 'error' command is obtained from an `Event` and `settings['tend']` is also not reached. 336 """ 337 done = False 338 attempts = 0 339 sols = [] 340 341 event_dict, event_funcs = self._setup_events() 342 343 print(f"The GEFSolver aims at reaching t={self.tend} with ntr={self.ntr}.") 344 while not(done) and attempts < 10: 345 346 try: 347 tend = self.tend 348 atol = self.settings["atol"] 349 rtol = self.settings["rtol"] 350 solvermethod = self.settings["solvermethod"] 351 352 teval = np.arange(np.ceil(10*t0), np.floor(10*tend) +1)/10 #hotfix 353 354 sol = solve_ivp(self.ode, [t0,tend], yini, t_eval=teval, args=(vals,), 355 method=solvermethod, atol=atol, rtol=rtol, events=event_funcs) 356 if not(sol.success): 357 raise TruncationError 358 #find 359 except KeyboardInterrupt: 360 raise KeyboardInterrupt(f"Interrupted solver at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}.") 361 except Exception as e: 362 print(f"While solving the GEF ODE, an error occurred at t={float(vals.t.value):.2f}, N={float(vals.N.value):.2f}):") 363 print(e) 364 raise TruncationError 365 366 else: 367 sols.append(sol) 368 369 event_dict_new, command, terminal_event = self._assess_event_occurrences(sol.t_events, sol.y_events, vals) 370 371 for key in event_dict_new.keys(): 372 event_dict[key].append(event_dict_new[key]) 373 374 if command in ["finish", "error"]: 375 done=True 376 elif command=="repeat": 377 print("Repeating") 378 sols.pop() 379 elif command=="proceed": 380 #print("Proceeding") 381 t0 = sol.t[-1] 382 yini = sol.y[:,-1] 383 384 sol = self._finalise_solution(sols, event_dict, terminal_event) 385 386 if attempts != 1 and not(done): 387 print(f"The run failed after {attempts} attempts.") 388 raise RuntimeError 389 390 return sol
Attempt to solve the GEF EoM's using scipy.integrate.solve_ivp.
The solver attempts to obtain a GEF solution. This solution is then checked for any ErrorEvent occurrences.
In this case, the solver marks the solution as unsuccessful and returns it for further processing.
If no ErrorEvent occurrences are found, the other TerminalEvent occurrences are analyzed.
These decide if the solution is returned, repeated, or if the solver should continue to solve.
If there are no active TerminalEvent instances, the solution is returned after reaching settings['tend'].
Parameters
- t0 (float): time of initialization
- yini (NDArray): initial data at t0
- vals (BGSystem): evolved alongside yini
Returns
- sol: a bunch object returned by
solve_ivpcontaining the solution
Raises
- TruncationError (): if an internal error occurred while solving the ODE.
- RuntimeError:: if no 'finish' or 'error' command is obtained from an
Eventandsettings['tend']is also not reached.
509 def parse_arr_to_sys(self, t : float, y : np.ndarray, vals : BGSystem): 510 """ 511 Translate a GEF solution array to a BGSystem. 512 513 Parameters 514 ---------- 515 t : NDArray 516 array of time coordinates 517 y : NDArray 518 array of variables at time t 519 vals : BGSystem 520 the target system 521 """ 522 vals.units = False 523 self.update_vals(t, y, vals) 524 return
Translate a GEF solution array to a BGSystem.
Parameters
- t (NDArray): array of time coordinates
- y (NDArray): array of variables at time t
- vals (BGSystem): the target system
526 def update_settings(self, **new_settings): 527 """ 528 Update the `settings` of the class. 529 """ 530 unknown_settings = [] 531 532 for setting, value in new_settings.items(): 533 if setting not in self.settings.keys(): 534 unknown_settings.append(setting) 535 elif value != self.settings[setting]: 536 print(f"Changing '{setting}' from {self.settings[setting]} to {value}.") 537 self.settings[setting] = value 538 if setting in ["atol", "rtol"]: 539 setattr(AuxTol, setting, value) 540 541 if len(unknown_settings) > 0: 542 print(f"Unknown settings: {unknown_settings}") 543 return
Update the settings of the class.
561def GEFSolver(new_init : Callable, new_update_vals : Callable, new_timestep : Callable, new_variables : dict, new_events : list['Event'] = []): 562 563 class CustomGEFSolver(BaseGEFSolver): 564 vals_to_yini = staticmethod(new_init) 565 update_vals = staticmethod(new_update_vals) 566 timestep = staticmethod(new_timestep) 567 known_variables = new_variables 568 known_events = {event.name : event for event in new_events} 569 570 CustomGEFSolver.__qualname__ = "GEFSolver" 571 CustomGEFSolver.__module__ = __name__ 572 return CustomGEFSolver
Create a subclass of BaseGEFSolver with custom equation of motions and initial conditions.
The subclass is adjusted to the specific EoMs of a new GEF model by defining new methods for BaseGEFSolver.vals_to_yini, BaseGEFSolver.update_vals and BaseGEFSolver.timestep via new_init, new_update_val and new_timestep.
Information about the classificiation of variables is encoded in new_variables which overwrites BaseGEFSolver.known_variables.
The new_init needs to obey the following rules:
- The call signature and output matches
vals_to_yini. - The return array
yinihas a shape: $n_{\rm dynam.} + 3(n_{\rm tr}+1) \times n_{\rm gauge}$. - All dynamical variables are reflected in the first $n_{\rm dynam.}$-indices of
yini. - All gauge variables are reflected in the last $3(n_{\rm tr}+1) \times n_{\rm gauge}$-indices of
yini.
Above, $n_{\rm dynam.}$ and $n_{\rm gauge}$ are, respectively, the number of dynamical and gauge-field variables.
The new_update_vals needs to obey the following rules:
- The call signature and output matches
update_vals. - It updates every static variable using values in
yandt.
The new_timestep needs to obey the following rules:
- The call signature and output matches
timestep. - It computes derivatives
dydtfor every dynamical or gauge varibale. - Static variables from
new_update_valscan be re-used, as it is called beforenew_timestep. - The indices in
dydtneed to match those ofyinireturned bynew_init.
All these functions assume that vals is in numerical units throughout the computation.
In addition, a new list of Event objects can be passed to the subclass using new_events
For an example on how to define a new solver, see GEFF.models.classic.
Parameters
- new_init (Callable):
overwrites
BaseGEFSolver.vals_to_yini - new_update_vals (Callable):
overwrites
BaseGEFSolver.update_vals - new_timestep (Callable):
overwrites
BaseGEFSolver.timestep - new_events (list of Event):
overwrites
BaseGEFSolver.known_events - new_variables (dict):
overwrites
BaseGEFSolver.known_variables
Returns
- GEFSolver: a subclass of BaseGEFSolver
575class Event: 576 def __init__(self, name : str, eventtype : str, func : Callable, terminal : bool, direction : int): 577 """ 578 Initialise the event as `active`. 579 580 Parameters 581 ---------- 582 name : str 583 sets the `name` attribute 584 eventtype : str 585 sets the `type` attribute 586 func : Callable 587 sets the `event_func` attribute 588 terminal : boolean 589 defines if the event occurrence is terminal or not 590 direction : int 591 defines the direction for which event occurrences are tracked 592 """ 593 self.name : str= name 594 """The name of the event.""" 595 596 self.type : str= eventtype 597 """The event type 'terminal', 'error', or 'observer'.""" 598 599 self.active : bool = True 600 """The events state, `False` implies the `Event` is disregarded by the solver.""" 601 602 func.terminal = terminal 603 func.direction = direction 604 605 self.event_func = func 606 return 607 608 @staticmethod 609 def event_func(t : float, y : np.ndarray, sys : BGSystem) -> float: 610 """ 611 The event tracked by `Event` 612 613 This method is overwritten by the `func` input upon class initialisation. 614 The signature and return of `func`needs to match this method 615 616 Parameters 617 ---------- 618 t : float 619 time 620 y : np.ndarray 621 the solution array 622 sys : BGSystem 623 the system which is evolved alongside y 624 625 Returns 626 ------- 627 condition : float 628 condition=0 is an event occurrence 629 """ 630 return 1.
An event which is tracked while solving the GEF equations.
The class defines a function $f(t, y)$ which is used by scipy.integrate.solve_ivp to track occurrences of $f(t, y(t))=0$.
The event can be terminal causing the solver to stop upon an event occurrence.
The event only triggers if the event condition changes sign according to:
- positive zero crossing:
direction=1 - negative derivative,
direction=-1 - arbitrary zero crossing
direction=0
The zeros are recorded and returned as part of the solvers output.
The function $f$ is encoded in the method event_func which is defined upon initialization.
Within subclasses of BaseGEFSolver class, three subclasses of Event are used:
576 def __init__(self, name : str, eventtype : str, func : Callable, terminal : bool, direction : int): 577 """ 578 Initialise the event as `active`. 579 580 Parameters 581 ---------- 582 name : str 583 sets the `name` attribute 584 eventtype : str 585 sets the `type` attribute 586 func : Callable 587 sets the `event_func` attribute 588 terminal : boolean 589 defines if the event occurrence is terminal or not 590 direction : int 591 defines the direction for which event occurrences are tracked 592 """ 593 self.name : str= name 594 """The name of the event.""" 595 596 self.type : str= eventtype 597 """The event type 'terminal', 'error', or 'observer'.""" 598 599 self.active : bool = True 600 """The events state, `False` implies the `Event` is disregarded by the solver.""" 601 602 func.terminal = terminal 603 func.direction = direction 604 605 self.event_func = func 606 return
Initialise the event as active.
Parameters
- name (str):
sets the
nameattribute - eventtype (str):
sets the
typeattribute - func (Callable):
sets the
event_funcattribute - terminal (boolean): defines if the event occurrence is terminal or not
- direction (int): defines the direction for which event occurrences are tracked
608 @staticmethod 609 def event_func(t : float, y : np.ndarray, sys : BGSystem) -> float: 610 """ 611 The event tracked by `Event` 612 613 This method is overwritten by the `func` input upon class initialisation. 614 The signature and return of `func`needs to match this method 615 616 Parameters 617 ---------- 618 t : float 619 time 620 y : np.ndarray 621 the solution array 622 sys : BGSystem 623 the system which is evolved alongside y 624 625 Returns 626 ------- 627 condition : float 628 condition=0 is an event occurrence 629 """ 630 return 1.
The event tracked by Event
This method is overwritten by the func input upon class initialisation.
The signature and return of funcneeds to match this method
Parameters
- t (float): time
- y (np.ndarray): the solution array
- sys (BGSystem): the system which is evolved alongside y
Returns
- condition (float): condition=0 is an event occurrence
632class TerminalEvent(Event): 633 """ 634 An `Event` subclass whose occurrence terminates the solver. 635 636 When the solver has terminated (due to an event or otherwise) it checks for `TerminalEvent` occurrences. 637 This calls the `event_consequence` method, which returns instructions to `BaseGEFSolver`. 638 These instructions may be different depending on an event occurrence or a non-occurrence. 639 """ 640 def __init__(self, name : str, func : Callable, direction : int, consequence : Callable): 641 """ 642 Initialise the parent class and overwrite the `event_consequence` method. 643 644 Parameters 645 ---------- 646 name : str 647 sets the `name` attribute 648 func : Callable 649 sets the `event_func` attribute 650 direction : int 651 passed as `direction` to the parent class 652 consequence : Callable 653 overwrites the `event_consequence` method 654 """ 655 super().__init__(name, "terminal", func, True, direction) 656 #ToDo: check for Consequence signature (once it is fixed) 657 self.event_consequence = staticmethod(consequence) 658 659 @staticmethod 660 def event_consequence(sys : BGSystem, occurrence : bool) -> Tuple[str, dict]: 661 """ 662 Inform the solver how to handle a (non-)occurrence of the event. 663 664 This method is overwritten by the `consequence` input upon class initialisation. 665 666 The methods returns are treated as an instruction to the solver: 667 - `primary`: this informs the solver what to do with its ODE solution: 668 - 'finish': the solver returns its solution marked as successful. 669 - 'proceed': the solver continues solving from the termination time onwards. 670 - 'repeat': the solver recomputes the solution. 671 - `secondary`: this informs the solver if any of its settings need to be changed. 672 Allowed attributes are 'timestep', 'tend', 'atol', 'rtol'. See `BaseGEFSolver` for more details. 673 674 Parameters 675 ---------- 676 sys : BGSystem 677 a system containing the solution of the solver 678 occurrence : bool 679 indicates if the event occurred during the solution or not 680 681 Returns 682 ------- 683 primary : str 684 either 'finish', 'proceed' or 'repeat' 685 secondary : dict 686 the affected settings as keys and their new value as an item 687 """ 688 if occurrence: 689 return "proceed", {} 690 else: 691 return "proceed", {}
An Event subclass whose occurrence terminates the solver.
When the solver has terminated (due to an event or otherwise) it checks for TerminalEvent occurrences.
This calls the event_consequence method, which returns instructions to BaseGEFSolver.
These instructions may be different depending on an event occurrence or a non-occurrence.
640 def __init__(self, name : str, func : Callable, direction : int, consequence : Callable): 641 """ 642 Initialise the parent class and overwrite the `event_consequence` method. 643 644 Parameters 645 ---------- 646 name : str 647 sets the `name` attribute 648 func : Callable 649 sets the `event_func` attribute 650 direction : int 651 passed as `direction` to the parent class 652 consequence : Callable 653 overwrites the `event_consequence` method 654 """ 655 super().__init__(name, "terminal", func, True, direction) 656 #ToDo: check for Consequence signature (once it is fixed) 657 self.event_consequence = staticmethod(consequence)
Initialise the parent class and overwrite the event_consequence method.
Parameters
- name (str):
sets the
nameattribute - func (Callable):
sets the
event_funcattribute - direction (int):
passed as
directionto the parent class - consequence (Callable):
overwrites the
event_consequencemethod
659 @staticmethod 660 def event_consequence(sys : BGSystem, occurrence : bool) -> Tuple[str, dict]: 661 """ 662 Inform the solver how to handle a (non-)occurrence of the event. 663 664 This method is overwritten by the `consequence` input upon class initialisation. 665 666 The methods returns are treated as an instruction to the solver: 667 - `primary`: this informs the solver what to do with its ODE solution: 668 - 'finish': the solver returns its solution marked as successful. 669 - 'proceed': the solver continues solving from the termination time onwards. 670 - 'repeat': the solver recomputes the solution. 671 - `secondary`: this informs the solver if any of its settings need to be changed. 672 Allowed attributes are 'timestep', 'tend', 'atol', 'rtol'. See `BaseGEFSolver` for more details. 673 674 Parameters 675 ---------- 676 sys : BGSystem 677 a system containing the solution of the solver 678 occurrence : bool 679 indicates if the event occurred during the solution or not 680 681 Returns 682 ------- 683 primary : str 684 either 'finish', 'proceed' or 'repeat' 685 secondary : dict 686 the affected settings as keys and their new value as an item 687 """ 688 if occurrence: 689 return "proceed", {} 690 else: 691 return "proceed", {}
Inform the solver how to handle a (non-)occurrence of the event.
This method is overwritten by the consequence input upon class initialisation.
The methods returns are treated as an instruction to the solver:
primary: this informs the solver what to do with its ODE solution:- 'finish': the solver returns its solution marked as successful.
- 'proceed': the solver continues solving from the termination time onwards.
- 'repeat': the solver recomputes the solution.
secondary: this informs the solver if any of its settings need to be changed. Allowed attributes are 'timestep', 'tend', 'atol', 'rtol'. SeeBaseGEFSolverfor more details.
Parameters
- sys (BGSystem): a system containing the solution of the solver
- occurrence (bool): indicates if the event occurred during the solution or not
Returns
- primary (str): either 'finish', 'proceed' or 'repeat'
- secondary (dict): the affected settings as keys and their new value as an item
Inherited Members
694class ErrorEvent(Event): 695 """ 696 An `Event` subclass whose occurrence indicates undesired behavior of the solution. 697 698 When the solver terminates with an `ErrorEvent`, `BaseGEFSolver.solve_eom` returns the solution as unsuccessful. 699 """ 700 def __init__(self, name : str, func : Callable, direction : int, message : str): 701 """Initialise the parent class. 702 703 The additional parameter `message` is printed on an event occurrence 704 """ 705 super().__init__(name, "error", func, True, direction) 706 self.message = message
An Event subclass whose occurrence indicates undesired behavior of the solution.
When the solver terminates with an ErrorEvent, BaseGEFSolver.solve_eom returns the solution as unsuccessful.
700 def __init__(self, name : str, func : Callable, direction : int, message : str): 701 """Initialise the parent class. 702 703 The additional parameter `message` is printed on an event occurrence 704 """ 705 super().__init__(name, "error", func, True, direction) 706 self.message = message
Initialise the parent class.
The additional parameter message is printed on an event occurrence
Inherited Members
708class ObserverEvent(Event): 709 """An `Event` which does not terminate the solve, but only records its occurences.""" 710 def __init__(self, name : str, func : Callable, direction : int): 711 """Initialise the parent class.""" 712 super().__init__(name, "observer", func, False, direction)
An Event which does not terminate the solve, but only records its occurences.
710 def __init__(self, name : str, func : Callable, direction : int): 711 """Initialise the parent class.""" 712 super().__init__(name, "observer", func, False, direction)
Initialise the parent class.
Inherited Members
714class TruncationError(Exception): 715 """ 716 Exception indicating that a GEF solution was unsuccessful. 717 """ 718 pass
Exception indicating that a GEF solution was unsuccessful.
720class ODECompilationError(Exception): 721 """ 722 Exception indicating that an error occured while testing the ODE of a model. 723 """ 724 pass
Exception indicating that an error occured while testing the ODE of a model.