geff.mbm
This module is used to compute and analyse spectra of gauge-field modes.
Throughout this module, any gauge-field mode functions are represented by dimensionless variables
$$\sqrt{2k} A_\lambda(t,k), \quad \sqrt{\frac{2}{k}}\, a(t)\dot{A}_\lambda(k, t), \quad \lambda = \pm 1$$
where $A_\lambda(t,k)$ are the mode functions of helicity $\lambda$ and momentum $k$ for a canonically quantized Abelian gauge-field $A_\mu(t, \boldsymbol{x})$ in Coulomb & Weyl gauge. The momentum variables $k$ are always returned in numerical units, i.e., $\bar{k} = k/\omega$.
The class BaseModeSolver is designed to solve the second order mode equation
$$\ddot{A}_\lambda(t,k) + P_\lambda(t,k)\dot{A}_\lambda(t,k) + Q_\lambda(t,k)A_\lambda(t,k) = 0 \, .$$
The base class in particular is set to solve the mode equation of pure axion inflation, $$P_\lambda(t,k) = H \, \qquad Q_\lambda(t,k) = \left(\frac{k}{a}\right)^2 - 2\lambda \left(\frac{k}{a}\right) \xi H \, ,$$ with Hubble rate $H$, scale factor $a$ and instability parameter $\xi$.
To create a mode solver with custom values for $P(t,k)$ and $Q(t,k)$, use the class factory ModeSolver.
The module also contains a class GaugeSpec designed for directly working on the spectrum of modes $A_\lambda(t,k)$.
In particular, it is used to integrate the spectrum to obtain the quantities
$$ \mathcal{F}_\mathcal{E}^{(n)}(t) = \int\limits_{0}^{k_{{\rm UV}}(t)}\frac{{\rm d} k}{k} \frac{a^2 k^{n+3}}{2 \pi^2 k_{{\rm UV}}^{n+4}} \sum_{\lambda}\lambda^n |\dot{A}_\lambda(t,k)|^2\,,$$ $$ \mathcal{F}_\mathcal{G}^{(n)}(t) = \int\limits_{0}^{k_{{\rm UV}}(t)}\frac{{\rm d} k}{k} \frac{a k^{n+4}}{2 \pi^2 k_{{\rm UV}}^{n+4}}\sum_{\lambda}\lambda^{n+1} \operatorname{Re}[\dot{A}_\lambda(t,k)A_\lambda^*(t,k)]\,,$$ $$ \mathcal{F}_\mathcal{B}^{(n)}(t) = \int\limits_{0}^{k_{{\rm UV}}(t)}\frac{{\rm d} k}{k} \frac{k^{n+5}}{2 \pi^{2}k_{{\rm UV}}^{n+4}} \sum_{\lambda}\lambda^n |A_\lambda(t,k)|^2\,,$$
which may be used to estimate the error of a GEF solution.
1import os 2import numpy as np 3import pandas as pd 4from scipy.interpolate import CubicSpline, PchipInterpolator 5from scipy.integrate import solve_ivp 6from scipy.integrate import quad, simpson 7from .bgtypes import Variable, Constant, Func, BGSystem 8from .utility.mode import mode_equation_classic, bd_classic 9from typing import Tuple, Callable, ClassVar 10from types import NoneType 11from ._docs import generate_docs, docs_mbm 12from tabulate import tabulate 13 14 15class GaugeSpec(dict): 16 def __init__(self, in_dict : dict): 17 """ 18 Initialise the spectrum from a dictionary. 19 20 Parameters 21 ---------- 22 in_dict : dict 23 dictionary with keys `'t'`, `'N'`, `'k'`, `'Ap'`, `'dAp'`, `'Am'`, `'dAm'` 24 25 Raises 26 ------ 27 KeyError 28 if one of the necessary keys is missing. 29 ValueError 30 if `len(in_dict['t']) != len(in_dict['N'])` or if 31 `in_dict['X']).shape != (len(in_dict['k']),len(in_dict['t']))` for `'X'` in `['Ap', 'dAp', 'Am', 'dAm']`. 32 """ 33 for key in ["t", "N", "k", "Ap", "dAp", "Am", "dAm"]: 34 if key not in in_dict.keys(): 35 raise KeyError(f"Missing key: {key}") 36 37 if not(len(in_dict["t"]) == len(in_dict["N"])): 38 raise ValueError("The length of 't' needs to match the length of 'N'") 39 40 shape = (len(in_dict["k"]), len(in_dict["t"])) 41 for key in ["Ap", "dAp", "Am", "dAm"]: 42 if not( in_dict[key].shape == shape): 43 raise ValueError(f"The shape of {key} needs to be {shape}") 44 45 super().__init__(in_dict) 46 47 @classmethod 48 def read_spec(cls, path : str): 49 """ 50 Initialise the class from a file. 51 52 Parameters 53 ---------- 54 path : str 55 path to the data 56 57 Returns 58 ------- 59 spec : GaugeSpec 60 the imported spectrum 61 """ 62 input_df = pd.read_table(path, sep=",") 63 dataAp = input_df.values 64 65 x = np.arange(3,dataAp.shape[1], 4) 66 67 t = np.asarray(dataAp[1:,1], dtype=float) 68 N = np.asarray(dataAp[1:,2], dtype=float) 69 logk = np.array([(complex(dataAp[0,y])).real for y in x]) 70 Ap = np.array([[complex(dataAp[i+1,y]) for i in range(len(N))] for y in x]) 71 dAp = np.array([[complex(dataAp[i+1,y+1]) for i in range(len(N))] for y in x]) 72 Am = np.array([[complex(dataAp[i+1,y+2]) for i in range(len(N))] for y in x]) 73 dAm = np.array([[complex(dataAp[i+1,y+3]) for i in range(len(N))] for y in x]) 74 75 k = 10**logk 76 77 spec = cls({"t":t, "N":N, "k":k, 78 "Ap":Ap, "dAp":dAp, "Am":Am, "dAm":dAm}) 79 80 return spec 81 82 def save_spec(self, path : str): 83 """ 84 Store the spectrum in a file. 85 86 Parameters 87 ---------- 88 path : str 89 path to the data 90 """ 91 thinning = 1 92 N = np.array([np.nan]+list(self["N"][-1::-thinning][::-1])) 93 t = np.array([np.nan]+list(self["t"][-1::-thinning][::-1])) 94 95 dic = {"t":t, "N":N} 96 97 for j, k in enumerate(self["k"]): 98 spec_slice = self.kslice(j) 99 logk = np.log10(spec_slice["k"]) 100 for key in ["Ap", "dAp", "Am", "dAm"]: 101 dictmp = {key + "_" + str(j) :np.array([logk] + 102 list(spec_slice[key][-1::-thinning][::-1]))} 103 dic.update(dictmp) 104 105 dir_name = os.getcwd() 106 107 path = os.path.join(dir_name, path) 108 109 output_df = pd.DataFrame(dic) 110 output_df.to_csv(path) 111 112 return 113 114 def get_dim(self) -> dict: 115 """ 116 Get the number of time coordinates and momenta stored in the spectrum. 117 118 Returns 119 ------- 120 dict 121 a dictionary encoding the spectrum's shape 122 """ 123 return {"kdim":len(self["k"]), "tdim":len(self["t"])} 124 125 def tslice(self, ind : int) -> dict: 126 """ 127 Evaluate the spectrum at time `self['t'][ind]`. 128 129 Parameters 130 ---------- 131 ind : int 132 the temporal index 133 134 Returns 135 ------- 136 spec_slice : SpecSlice 137 the spectrum at time `self['t'][ind]` 138 """ 139 140 spec_slice = {} 141 for key in self.keys(): 142 if key in ["N", "t", "cut"]: 143 spec_slice[key] = self[key][ind] 144 elif key=="k": 145 spec_slice[key] = self[key] 146 else: 147 spec_slice[key] = self[key][:,ind] 148 return SpecSlice(spec_slice) 149 150 def kslice(self, ind : int) -> dict: 151 """ 152 Obtain the time evolution for the momentum `self['k'][ind]`. 153 154 Parameters 155 ---------- 156 ind : int 157 the momentum index. 158 159 Returns 160 ------- 161 spec_slice : dict 162 a dictionary with keys like `self` 163 """ 164 165 spec_slice = {} 166 for key in self.keys(): 167 if key in ["N", "t", "cut"]: 168 spec_slice[key] = self[key] 169 elif key=="k": 170 spec_slice[key] = self[key][ind] 171 else: 172 spec_slice[key] = self[key][ind,:] 173 return spec_slice 174 175 def integrate(self, BG : BGSystem, n : int=0, cutoff="kh", **IntegratorKwargs) -> np.ndarray: 176 r""" 177 Compute the three integrals $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$ for fixed $n$ and each time $t$ in the spectrum. 178 179 If the time coordinates stored in `BG` do not match those stored in the spectrum, $k_{\rm UV}(t)$ is evaluated using interpolation. 180 181 Parameters 182 ---------- 183 BG : BGSystem 184 a system containing the UV cut-off, $k_{\rm UV}(t)$ 185 n : int 186 the integer $n$ 187 cutoff : str 188 the name under which the UV-cutoff is stored in `BG` 189 **IntegratorKwargs : kwargs 190 passed to `SpecSlice.integrate_slice` 191 192 193 Returns 194 ------- 195 FMbM : NDArray 196 $\mathcal{F}_\mathcal{E}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$ stored in a shape (N, 3, 2). 197 The first index corresponds to time $t$, the second index to $\mathcal{X}$, the third index is the integral result (at 0) and its error (at 1). 198 """ 199 200 self._add_cutoff(BG, cutoff) 201 202 tdim = self.get_dim()["tdim"] 203 204 FMbM = np.zeros((tdim, 3,2)) 205 for i in range(tdim): 206 spec_slice = self.tslice(i) 207 FMbM[i,:] = spec_slice.integrate_slice(n=n, **IntegratorKwargs) 208 209 return FMbM 210 211 def estimate_GEF_error(self, BG : BGSystem, references : list[str]=["E", "B", "G"], cutoff : str="kh", 212 err_thr : float=0.025, binning : int|NoneType=5, verbose : bool=True, 213 **IntegratorKwargs) -> Tuple[list, np.ndarray, list]: 214 215 216 og_errs = self._estimate_error(BG, references, cutoff, **IntegratorKwargs) 217 terr = self["t"] 218 219 if binning is not None: 220 errs, terr = self._bin_error(og_errs, binning) 221 else: 222 errs = og_errs 223 224 errs, terr = self._process_error(errs, terr, err_thr) 225 226 if verbose: 227 self._error_summary(errs, terr, references) 228 229 return errs, terr, og_errs 230 231 def merge_spectra(self, spec : 'GaugeSpec'): 232 """ 233 Combine two spectra with the same momenta $k$ but unequal times $t$. 234 235 Parameters 236 ---------- 237 spec : GaugeSpec 238 the second spectrum 239 240 Raises 241 ------ 242 AssertionError 243 if the momenta $k$ do not match up. 244 245 """ 246 assert (spec["k"] == self["k"]).all() 247 248 ind = np.searchsorted(self["t"], spec["t"][0], "left") 249 250 if "cut" in self.keys(): 251 self.pop("cut") 252 253 for key in self.keys(): 254 if key in ["t", "N"]: 255 self[key] = np.concatenate([self[key][:ind], spec[key]]) 256 else: 257 if key != "k": 258 self[key] = np.concatenate([self[key][:,:ind], spec[key]], axis=1) 259 return 260 261 def add_momenta(self, spec : 'GaugeSpec'): 262 """ 263 Combine two spectra with the same times $t$ but unequal momenta $k$. 264 265 Parameters 266 ---------- 267 spec : GaugeSpec 268 the second spectrum 269 270 Raises 271 ------ 272 AssertionError 273 if the times $t$ do not match up. 274 275 """ 276 277 assert (np.round(spec["t"],1) == np.round(self["t"],1)).all() 278 279 newks = [] 280 mds = ["Ap", "dAp", "Am", "dAm"] 281 newmodes = dict( zip(mds, [ [] for i in mds]) ) 282 283 for i, k in enumerate(self["k"]): 284 mask = np.where(k > spec["k"])[0] 285 if len(mask) != 0: 286 newks.append(spec["k"][mask]) 287 spec["k"] = np.delete(spec["k"], mask) 288 for md in mds: 289 newmodes[md].append(spec[md][mask,:]) 290 spec[md] = np.delete(spec[md], mask, axis=0) 291 newks.append(np.array([k])) 292 for md in mds: 293 newmodes[md].append(np.array([self[md][i,:]])) 294 295 mask = np.where(spec["k"] > k)[0] 296 if len(mask) != 0: 297 newks.append(spec["k"][mask]) 298 spec["k"] = np.delete(spec["k"], mask) 299 for md in mds: 300 newmodes[md].append(spec[md][mask,:]) 301 spec[md] = np.delete(spec[md], mask, axis=0) 302 303 self["k"] = np.concatenate(newks) 304 for md in mds: 305 self[md] = np.concatenate(newmodes[md], axis=0) 306 307 return 308 309 def remove_momenta(self, ind): 310 """ 311 Remove the spectrum at momentum `self["k"][ind]`. 312 313 Parameters 314 ---------- 315 ind : int 316 the index at which to remove the spectrum entry 317 """ 318 self["k"] = np.delete(self["k"], ind) 319 for md in ["Ap", "dAp", "Am", "dAm"]: 320 self[md] = np.delete(self[md], ind, axis=0) 321 return 322 323 def _check_overlap(self, t): 324 mask = np.isin(t, self["t"], assume_unique=True) 325 if len(t[mask]) != len(self["t"]): 326 #this should be a warning 327 print("The times in the current GaugeSpec instance are " \ 328 "not a subset of the times in the BGSystem. Reverting to interpolation.") 329 330 return False, None 331 else: 332 return True, mask 333 334 def _add_cutoff(self, BG : BGSystem, cutoff="kh"): 335 units = BG.units 336 BG.units = (False) 337 338 scale = getattr(BG, cutoff) 339 340 bl, mask = self._check_overlap(BG.t) 341 342 if bl: 343 self["cut"] = scale[mask] 344 else: 345 self["cut"] = CubicSpline(BG.t, scale)(self["t"]) 346 347 BG.units = (units) 348 349 return self["cut"] 350 351 def _get_reference(self, BG : BGSystem, references=["E", "B", "G"], cutoff="kh"): 352 units = BG.units 353 BG.units = (False) 354 355 scale = getattr(BG, cutoff) 356 357 bl, mask = self._check_overlap(BG.t) 358 359 Fref = [] 360 for val in references: 361 val_arr = (getattr(BG, val)*(np.exp(BG.N)/scale)**4) 362 if bl: 363 Fref.append( val_arr[mask] ) 364 else: 365 Fref.append( CubicSpline(BG.t, val_arr)(self["t"]) ) 366 367 BG.units = (units) 368 369 return Fref 370 371 372 def _estimate_error(self, BG : BGSystem, references : list[str]=["E", "B", "G"], cutoff : str="kh", 373 **IntegratorKwargs): 374 FMbM = self.integrate(BG, n=0, cutoff=cutoff, **IntegratorKwargs) 375 Fref = self._get_reference(BG, references, cutoff) 376 377 errs = [] 378 379 for i, spl in enumerate(Fref): 380 err = np.minimum(1e3, abs( 1 - (FMbM[:,i,0]+1e-20)/(spl+1e-20) )) #avoid divide by zeros 381 errs.append(np.where(np.isnan(err), 10.0, err)) 382 383 return errs 384 385 def _process_error(self, errs, terr, err_thr): 386 removals = [] 387 for err in errs: 388 #remove the first few errors where the density of modes is low: 389 removals.append(np.where(err < err_thr)[0][0]) 390 #ind = 0 391 ind = max(removals) 392 errs = [err[ind:] for err in errs] 393 terr = terr[ind:] 394 395 return errs, terr 396 397 def _bin_error(self, errs, binning): 398 terr = self["t"] 399 400 bin_terr = terr[::-binning][::-1] 401 tbins = bin_terr[1:] + (bin_terr[1:] - bin_terr[:-1])/2 402 count, _ = np.histogram(terr, bins=tbins) 403 404 bin_errs = [] 405 for err in errs: 406 sum, _ = np.histogram(terr, bins=tbins, weights=err) 407 bin_errs.append(sum/count) 408 bin_terr = bin_terr[2:] 409 410 return bin_errs, bin_terr 411 412 @staticmethod 413 def _error_summary(bin_errs, bin_terr, references : list[str]=["E", "B", "G"]): 414 print("The mode-by-mode comparison finds the following relative deviations from the GEF solution:\n") 415 416 lst = [] 417 for err in bin_errs: 418 rmserr = 100*np.sqrt(np.sum(err**2)/len(err)) 419 finerr = 100*err[-1] 420 maxerr = 100*max(err) 421 422 tmaxerr = bin_terr[np.argmax(err)] 423 tfinerr = bin_terr[-1] 424 425 lst.append([f"{maxerr:.1f}% at {tmaxerr:.1f}", 426 f"{finerr:.1f}% at {tfinerr:.1f}", 427 f"{rmserr:.1f}%"]) 428 429 print(tabulate(lst, headers=["max", "final", "RMS"], showindex=references, tablefmt="simple")+"\n") 430 return 431 432 433# continue from here next time 434class SpecSlice(dict): 435 436 def __init__(self, in_dict): 437 super().__init__(in_dict) 438 439 def integrate_slice(self, n : int=0, integrator="simpson", modethr : int=100, 440 epsabs : float=1e-20, epsrel : float=1e-4, interpolator=PchipInterpolator 441 )-> Tuple[np.ndarray, np.ndarray]: 442 x = (n+4)*np.log(self["k"]/self["cut"]) 443 444 helicities = ["p", "m"] 445 integs = np.zeros((3, 2, len(x))) 446 for i, lam in enumerate(helicities): 447 sgn = np.sign(0.5-i) 448 integs[0,i,:] = self._Espec(lam) 449 integs[1,i,:] = self._Bspec(lam) 450 integs[2,i,:] = sgn*self._Gspec(lam) 451 452 res = np.zeros((3,2)) 453 454 for i in range(3): 455 if integrator=="simpson": 456 msk = np.where(x < 0)[0] 457 if len(msk) < modethr: #cannot trust simpsons integration for too few modes. 458 return res 459 x = x[msk] 460 res[i,0] = (self._simpson_integrate( integs[i,0,msk] ,x) 461 + (-1)**n*self._simpson_integrate(integs[i,1,msk], x) ) 462 res[i,1] = 1e-6*res[i,0] 463 464 elif integrator=="quad": 465 resp = self._quad_integrate( integs[i,0,:], x, epsabs, epsrel, interpolator) 466 resm = self._quad_integrate( (-1)**n*integs[i,1,:], x, epsabs, epsrel, interpolator) 467 res[i,:] = resp +resm 468 469 res = 1/(2*np.pi)**2*res/(n+4) 470 471 res[:,1] = abs(res[:,1]/res[:,0]) 472 return res 473 474 def _Espec(self, lam): 475 return abs(self["dA"+lam])**2 476 477 def _Bspec(self, lam): 478 return abs(self["A"+lam])**2 479 480 def _Gspec(self, lam): 481 return (self["A"+lam].conjugate()*self["dA"+lam]).real 482 483 def _simpson_integrate(self, integrand, x): 484 integrand = integrand*np.exp(x) 485 return simpson(integrand, x) 486 487 def _quad_integrate(self, integrand, x, epsabs : float=1e-20, epsrel : float=1e-4, interpolator=PchipInterpolator): 488 msk = np.where(abs(integrand) > 1e-1*max(epsrel*max(abs(integrand)), epsabs))[0] 489 if len(msk) > 0: 490 spl = interpolator(x, np.arcsinh(integrand)) 491 def f(x): return np.sinh(spl(x))*np.exp(x) 492 val, err = quad(f, x[msk][0], 0., epsabs=epsabs, epsrel=epsrel) 493 return np.array([val, err]) 494 else: 495 return np.nan*np.ones((2)) 496 497class BaseModeSolver: 498 _ode_dict : dict = {"a": "a", "H":"H", "xi":"xi"} 499 _bd_dict = {} 500 501 cutoff : ClassVar[str] = "kh" 502 r"""The name of $k_{\rm UV}(t)$ in the GEF solution.""" 503 504 atol : ClassVar[float] = 1e-3 505 """The default absolute tolerance used in `scipy.integrate.solve_ivp`""" 506 507 necessary_keys : ClassVar[set] = (["t", "N", "a", "H", "xi"] + [cutoff]) 508 """The class expects these attributes in the `.bgtypes.BGSystem` passed on initialisation.""" 509 510 mode_equation = staticmethod(mode_equation_classic) 511 initialise_in_bd = staticmethod(bd_classic) 512 513 def __init__(self, sys : BGSystem): 514 """ 515 Import the evolution of the background dynamics to configure the solver. 516 517 All values in the BGSystem are treated in numerical units. 518 519 Parameters 520 ---------- 521 sys : BGSystem 522 describes the background evolution 523 524 Raises 525 ------ 526 KeyError: 527 if `sys` is missing a `Val` or `Func` object from `necessary_keys` 528 ValueError: 529 if the keys in `necessary_keys` are not `Val` or `Func` objects. 530 """ 531 og_units = sys.units 532 sys.units = False 533 534 #Check that all necessary keys are there: 535 for key in self.necessary_keys: 536 check = True 537 check = ( (key in sys.variable_names()) 538 or 539 (key in sys.constant_names()) 540 or 541 (key in sys.function_names()) 542 ) 543 if not(check): 544 raise KeyError(f"'sys' needs to own an attribute called '{key}'.") 545 546 #store the relevant background evolution parameters 547 self._t = sys.t.value 548 self._N = sys.N.value 549 kh = getattr(sys, self.cutoff) 550 a = np.exp(self._N) 551 552 self._khf = CubicSpline(self._t, kh) 553 554 self._kwargs_ode = {kwarg:None for kwarg in self._ode_dict.values()} 555 self._kwargs_bd = {kwarg:None for kwarg in self._bd_dict.values()} 556 557 #import the values for the mode equation and interpolate 558 for key in self.necessary_keys: 559 obj = getattr(sys, key) 560 if isinstance(obj, Variable): 561 value = getattr(sys, key).value 562 func = CubicSpline(self._t, value) 563 elif isinstance(obj, Func): 564 arg_vals = [] 565 for arg in obj.args: 566 arg_vals.append(getattr(sys, arg.name)) 567 value = obj(*arg_vals) 568 func = CubicSpline(self._t, value) 569 elif isinstance(obj, Constant): 570 value = getattr(sys, key).value 571 func = CubicSpline(self._t, value*np.ones_like(self._t)) 572 else: 573 raise ValueError(f"'{key}' should refer to either a 'Val' or 'Func' subclass.") 574 575 if key in self._ode_dict.keys(): 576 kwarg = self._ode_dict[key] 577 self._kwargs_ode[kwarg] = func 578 if key in self._bd_dict.values(): 579 kwarg = self._bd_dict[key] 580 self._kwargs_bd[kwarg] = func 581 582 #compute the evolution of conformal time for the phases 583 af = CubicSpline(self._t, a) 584 def deta(t, y): return 1/af(t) 585 586 soleta = solve_ivp(deta, [min(self._t), max(self._t)], np.array([0]), t_eval=self._t) 587 588 self.__eta = soleta.y[0,:] 589 590 #find lowest t value corresponding to kh(t) = 10^4 kh(0) 591 self.__tmin = self._t[np.searchsorted(kh, 10**4*kh[0], "right")] 592 593 sys.units = og_units 594 595 return 596 597 def compute_spectrum(self, nvals : int, t_interval : tuple|NoneType=None, **SolverKwargs) -> GaugeSpec: 598 r""" 599 Evolve a gauge-field spectrum from Bunch-Davies initial conditions. 600 601 Evolve the mode functions $A_\lambda(t,k)$ and its derivative in time for $n$ modes between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. 602 The $n$ evolved modes are more densly spaced when $\log k_{\rm UV}(t)$ increases more slowly to ensure a higher density of modes 603 crossing the horizon when backreaction effects are relevant. 604 605 Parameters 606 ---------- 607 nvals : int 608 The number of modes $n$ 609 t_interval : tuple or None 610 set $t_{\rm min}$ and $t_{\rm max}$. If None, $t_{\rm min}$ is given by $10^4 k_{\rm UV}(t_{min}) = k_{\rm UV}(0)$ and $t_{\rm max} = \max t$. 611 **SolverKwargs : kwargs 612 tolerance parameters`atol` and `rtol` passed to `solve_ivp` (default: `atol=self.atol`, `rtol=1e-5`) 613 614 Returns 615 ------- 616 spec : GaugeSpec 617 the gauge-field spectrum 618 """ 619 620 if t_interval is None: 621 t_interval = (self.__tmin, max(self._t)) 622 ks, tstart = self._find_tinit_bd(self._create_k_array(nvals, t_interval), mode="k") 623 624 modes = np.array([self._evolve_from_bd(k, tstart[i], **SolverKwargs) 625 for i, k in enumerate(ks)]) 626 627 spec = GaugeSpec({"t":self._t, "N":self._N, "k":ks, 628 "Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 629 630 return spec 631 632 def update_spectrum(self, spec : GaugeSpec, tstart : float, **SolverKwargs) -> GaugeSpec: 633 r""" 634 Update an existing gauge-field spectrum starting from $t_{\rm start}$ 635 636 Starting from the modes stored in `GaugeSpec`, the function re-evaluates the evolution starting from $t_{\rm start}$. 637 Additional gauge-field modes are evolved starting from Bunch–Davies to account for new modes crossing the horizon 638 at times beyond the original range covered by the input spectrum. 639 640 Parameters 641 ---------- 642 spec : GaugeSpec 643 the spectrum which is to be updated 644 tstart : float 645 the starting time $t_{\rm start}$ 646 **SolverKwargs : kwargs 647 as in `compute_spectrum` 648 649 Returns 650 ------- 651 spec : GaugeSpec 652 the updated gauge-field spectrum 653 """ 654 655 indstart = np.searchsorted(self._t, tstart, "left") 656 teval = self._t[indstart:] 657 Neval = self._N[indstart:] 658 659 tend = teval[-1] 660 indstart = np.searchsorted(spec["t"], teval[0], "left") 661 startspec = spec.tslice(indstart) 662 tstart = startspec["t"] 663 664 #keep mode-evolution from old spectrum for modes with k < 10*kh(tstart) 665 old = np.where(spec["k"] < 10*self._khf(tstart)) 666 new = np.where(spec["k"] >= 10*self._khf(tstart)) 667 668 #Remove modes which need to be renewed from old spectrum: 669 spec.remove_momenta(new) 670 671 #add new modes, adjusting for longer time-span: 672 #rethink this! It seems like overkill 673 n_newmodes = int(len(new)*max((teval[-1] - teval[0])/(max(spec["t"]) - tstart), 1)) 674 675 #Update evolution of modes in spec: 676 kold, tvac = self._find_tinit_bd(spec["k"][old], mode="k") 677 678 updatespec={"t":teval, "N":Neval, "k":kold} 679 680 modes = [] 681 for i, k in enumerate(kold): 682 if tvac[i] > teval[0]: 683 modes.append( self._evolve_from_bd(k, tvac[i], **SolverKwargs) ) 684 else: 685 yini = np.array( 686 [startspec["Ap"][i].real, startspec["dAp"][i].real, 687 startspec["Ap"][i].imag, startspec["dAp"][i].imag, 688 startspec["Am"][i].real, startspec["dAm"][i].real, 689 startspec["Am"][i].imag, startspec["dAm"][i].imag] 690 ) 691 692 modes.append( self._evolve_mode(tstart, yini, k, teval, **SolverKwargs) ) 693 694 modes = np.array(modes) 695 696 updatespec.update({"Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 697 698 spec.merge_spectra(GaugeSpec(updatespec)) 699 700 if n_newmodes > 0: 701 newspec = self.compute_spectrum(n_newmodes, t_interval=(tstart, tend)) 702 #Add new modes 703 spec.add_momenta(newspec) 704 705 return spec 706 707 """ 708 def EvolveSpectrum(self, spec : GaugeSpec, Nstart, Nstep : float=0.1, atol : float|None=None, rtol : float=1e-5) -> GaugeSpec: 709 Neval = np.arange(Nstart, max(self.__N), Nstep) 710 teval = CubicSpline(self.__N, self.__t)(Neval) 711 712 indstart = np.where(spec["N"]<Nstart)[0][-1] 713 startspec = spec.tslice(indstart) 714 715 klen = len(spec["k"]) 716 717 vecode = np.vectorize(lambda t, y, k: self.mode_equation(t, y, k, **self._ode_kwargs), 718 excluded={0, "t"}, 719 signature="(8,n),(n)->(8,n)", 720 ) 721 def ode(t, y): 722 #(k,8) to reshape correctly 723 #transposing to match signature of vecode 724 y = y.reshape(klen,8).T 725 #transposing result s.t. dydt.shape=(k,8) 726 dydt = vecode(t, y, spec["k"]).T 727 #reshape is correct again 728 return dydt.reshape(8*klen) 729 730 spec.merge_spectra(GaugeSpec(newspec)) 731 732 return spec 733 """ 734 735 736 def _evolve_from_bd(self, k : float, tstart : float, 737 atol : float|None=None, rtol : float=1e-5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 738 """ 739 Evolve a mode with momentum $k$ starting from Bunch-Davies initial conditions. 740 741 Parameters 742 ---------- 743 k : float 744 the momentum $k$ 745 tstart : float 746 the corresponding initialisation time, $t_{\rm init}$ 747 atol : float or None 748 `atol` used by `solve_ivp` if None, use `self.atol` 749 rtol : float 750 `rtol` used by `solve_ivp` 751 752 Returns 753 ------- 754 yp : NDArray 755 the positive helicity mode 756 dyp : NDArray 757 the derivative of the positive helicity mode 758 ym : NDArray 759 the positive helicity mode 760 dym : NDArray 761 the derivative of the negative helicity mode 762 """ 763 764 #Initial conditions for y and dydt for both helicities (rescaled appropriately) 765 yini = self.initialise_in_bd(tstart, k, **self._kwargs_bd) 766 767 teval = self._t 768 769 istart = np.searchsorted(teval, tstart, "right") 770 771 yp, dyp, ym, dym = self._evolve_mode(tstart, yini, k, teval[istart:], atol, rtol) 772 773 #conformal time needed for relative phases 774 eta = self.__eta 775 776 #the mode was in vacuum before tstart 777 yvac = np.array([self.initialise_in_bd(t, k, **self._kwargs_bd) for t in teval[:istart]]).T 778 phasevac = (np.exp(-1j*k*eta[:istart])) 779 vac = yvac * phasevac 780 781 #Create array of mode evolution stringing together vacuum and non-vacuum time evolutions to get evolution from t0 to tend 782 yp = np.concatenate([(vac[0,:] + 1j*vac[2,:]), yp*np.exp(-1j*k*eta[istart])]) 783 dyp = np.concatenate([(vac[1,:] + 1j*vac[3,:]), dyp*np.exp(-1j*k*eta[istart])]) 784 ym = np.concatenate([(vac[4,:] + 1j*vac[6,:]), ym*np.exp(-1j*k*eta[istart])]) 785 dym = np.concatenate([(vac[5,:] + 1j*vac[7,:]), dym*np.exp(-1j*k*eta[istart])]) 786 787 return yp, dyp, ym, dym 788 789 def _evolve_mode(self, tini : float, yini : np.ndarray, k : float, teval : np.ndarray, 790 atol : float|None=None, rtol : float=1e-5): 791 """ 792 Evolve a mode of momentum $k$ from $t_{\rm ini}$. 793 794 Parameters 795 ---------- 796 tini : float 797 the initial time $t_{\rm ini}$ 798 yini : NDArray 799 (8,) array containing the initial data 800 k : float 801 the momentum $k$ 802 teval : NDArray 803 the times at which the returned solution is evaluated 804 atol : float or None 805 `atol` used by `solve_ivp` if None, use `self.atol` 806 rtol : float 807 `rtol` used by `solve_ivp` 808 809 Returns 810 ------- 811 yp : NDArray 812 the positive helicity mode 813 dyp : NDArray 814 the derivative of the positive helicity mode 815 ym : NDArray 816 the positive helicity mode 817 dym : NDArray 818 the derivative of the negative helicity mode 819 """ 820 #Define ODE 821 def ode(t, y): return self.mode_equation(t, y, k, **self._kwargs_ode) 822 823 if atol is None: 824 atol = self.atol 825 826 #Solve differential equation from tstart to tmax 827 sol = solve_ivp(ode, [tini, max(teval)], yini, t_eval=teval, method="RK45", atol=atol, rtol=rtol) 828 829 yp = (sol.y[0,:] + 1j*sol.y[2,:]) 830 dyp = (sol.y[1,:] + 1j*sol.y[3,:]) 831 ym = (sol.y[4,:] + 1j*sol.y[6,:]) 832 dym = (sol.y[5,:] + 1j*sol.y[7,:]) 833 834 return yp, dyp, ym, dym 835 836 837 def _create_k_array(self, nvals : int, t_interval : tuple) -> np.ndarray: 838 r""" 839 Create an array of $n$ momenta 840 841 The $n$ modes are generated between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. 842 First, $m$ of modes $k = k_{\rm UV}$ with t evenly spaced between $t_{\rm min}$ and $t_{\rm max}$ are generated. 843 As $k_{\rm UV}$ is monotonic but not strictly monotonic, $m\leq n$. To fill up to $n$ modes, $n-m$ additional modes are 844 created between the existing times $(t_{i},t_{i+1})$ moving backwards from $t_{\rm max}$ to favour larger momenta. 845 846 Parameters 847 ---------- 848 nvals : int 849 The size of the output array 850 t_interval : tuple 851 sets $t_{\rm min}$ and $t_{\rm max}$ 852 853 Returns 854 ------- 855 NDArray 856 an array of momenta with size nvals 857 """ 858 859 #create an array of values log(10*kh(t)) 860 logks = np.round( np.log(10*self._khf(np.linspace(t_interval[0], t_interval[1], nvals))), 3) 861 #filter out all values that are repeating (kh is not strictly monotonous) 862 logks = np.unique((logks)) 863 864 #fill up the array of ks values with additional elements between gaps, favouring larger k 865 while len(logks) < nvals: 866 num_newk = nvals - len(logks) 867 if num_newk >= len(logks): 868 newvals = (logks[1:] + logks[:-1])/2 869 else: 870 newvals = (logks[-num_newk:] + logks[-num_newk-1:-1])/2 871 logks = np.sort(np.concatenate([logks, newvals])) 872 return np.exp(logks) 873 874 def _find_tinit_bd(self, init : np.ndarray, mode : str="k") -> Tuple[np.ndarray, np.ndarray]: 875 """ 876 Determines the pair of $k$ and $t$ satisfying $k = 10^(5/2)k_h(t)$. 877 878 Depending on `mode`, `init` may be a time coordinate (`mode='t'`), $e$-folds (`mode='N'`) or momentum (`mode='k'`). 879 880 Parameters 881 ---------- 882 init : array 883 the input array (t, N, or k) 884 mode : str 885 indicate the type of `init` 886 887 Returns 888 ------- 889 ks : NDarray 890 an array of momenta 891 tstart : NDarray 892 an array of times 893 894 Raises 895 ------ 896 KeyError 897 if `mode` is not 't, 'k' or 'N' 898 """ 899 900 if mode=="t": 901 tstart = init 902 ks = 10**(5/2)*self._khf(tstart) 903 904 elif mode=="k": 905 ks = init 906 907 tstart = [] 908 for k in ks: 909 ttmp = self._t[np.searchsorted(10**(5/2)*self._khf(self._t), k, "right")] 910 tstart.append(ttmp) 911 tstart = np.array(tstart) 912 913 elif mode=="N": 914 tstart = CubicSpline(self._N, self._t)(init) 915 ks = 10**(5/2)*self._khf(tstart) 916 917 else: 918 raise KeyError("'mode' must be 't', 'k' or 'N'") 919 920 return ks, tstart 921 922 923def ModeSolver(new_mode_eq : Callable, ode_keys : list[str], new_bd_init : Callable, init_keys : list[str], new_cutoff : str="kh", default_atol : float=1e-3): 924 class CustomModeSolver(BaseModeSolver): 925 """ 926 A subclass of `BaseModeSolver` new mode equation and initial condition adapted to a new GEF model 927 928 It Inherits all methods from `BaseModeSolver` but changes the attributes: 929 - `BaseModeSolver.mode_equation` 930 - `BaseModeSolver.initialise_in_bd` 931 - `BaseModeSolver.necessary_keys` 932 - `BaseModeSolver.cutoff` 933 - `BaseModeSolver.atol` 934 935 This entails that `compute_spectrum` will now evolve modes according to `initialise_in_bd` and `mode_equation`. 936 """ 937 938 #Overwrite class attibutes of ModeByMode with new mode equations, boundary conditions and default tolerances. 939 mode_equation = staticmethod(new_mode_eq) 940 _ode_dict = {attr.name:kwarg for kwarg, attr in ode_keys.items()} 941 942 initialise_in_bd = staticmethod(new_bd_init) 943 _bd_dict = {attr.name:kwarg for kwarg, attr in init_keys.items()} 944 cutoff = new_cutoff 945 946 necessary_keys = list( {"t", "N", new_cutoff}.union(set(_ode_dict.keys())).union(set(_bd_dict.keys())) ) 947 948 atol=default_atol 949 950 CustomModeSolver.__qualname__ = "ModeSolver" 951 CustomModeSolver.__module__ = __name__ 952 return CustomModeSolver 953 954 955#define longer method docs from docs_mbm: 956generate_docs(docs_mbm.DOCS) 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972
16class GaugeSpec(dict): 17 def __init__(self, in_dict : dict): 18 """ 19 Initialise the spectrum from a dictionary. 20 21 Parameters 22 ---------- 23 in_dict : dict 24 dictionary with keys `'t'`, `'N'`, `'k'`, `'Ap'`, `'dAp'`, `'Am'`, `'dAm'` 25 26 Raises 27 ------ 28 KeyError 29 if one of the necessary keys is missing. 30 ValueError 31 if `len(in_dict['t']) != len(in_dict['N'])` or if 32 `in_dict['X']).shape != (len(in_dict['k']),len(in_dict['t']))` for `'X'` in `['Ap', 'dAp', 'Am', 'dAm']`. 33 """ 34 for key in ["t", "N", "k", "Ap", "dAp", "Am", "dAm"]: 35 if key not in in_dict.keys(): 36 raise KeyError(f"Missing key: {key}") 37 38 if not(len(in_dict["t"]) == len(in_dict["N"])): 39 raise ValueError("The length of 't' needs to match the length of 'N'") 40 41 shape = (len(in_dict["k"]), len(in_dict["t"])) 42 for key in ["Ap", "dAp", "Am", "dAm"]: 43 if not( in_dict[key].shape == shape): 44 raise ValueError(f"The shape of {key} needs to be {shape}") 45 46 super().__init__(in_dict) 47 48 @classmethod 49 def read_spec(cls, path : str): 50 """ 51 Initialise the class from a file. 52 53 Parameters 54 ---------- 55 path : str 56 path to the data 57 58 Returns 59 ------- 60 spec : GaugeSpec 61 the imported spectrum 62 """ 63 input_df = pd.read_table(path, sep=",") 64 dataAp = input_df.values 65 66 x = np.arange(3,dataAp.shape[1], 4) 67 68 t = np.asarray(dataAp[1:,1], dtype=float) 69 N = np.asarray(dataAp[1:,2], dtype=float) 70 logk = np.array([(complex(dataAp[0,y])).real for y in x]) 71 Ap = np.array([[complex(dataAp[i+1,y]) for i in range(len(N))] for y in x]) 72 dAp = np.array([[complex(dataAp[i+1,y+1]) for i in range(len(N))] for y in x]) 73 Am = np.array([[complex(dataAp[i+1,y+2]) for i in range(len(N))] for y in x]) 74 dAm = np.array([[complex(dataAp[i+1,y+3]) for i in range(len(N))] for y in x]) 75 76 k = 10**logk 77 78 spec = cls({"t":t, "N":N, "k":k, 79 "Ap":Ap, "dAp":dAp, "Am":Am, "dAm":dAm}) 80 81 return spec 82 83 def save_spec(self, path : str): 84 """ 85 Store the spectrum in a file. 86 87 Parameters 88 ---------- 89 path : str 90 path to the data 91 """ 92 thinning = 1 93 N = np.array([np.nan]+list(self["N"][-1::-thinning][::-1])) 94 t = np.array([np.nan]+list(self["t"][-1::-thinning][::-1])) 95 96 dic = {"t":t, "N":N} 97 98 for j, k in enumerate(self["k"]): 99 spec_slice = self.kslice(j) 100 logk = np.log10(spec_slice["k"]) 101 for key in ["Ap", "dAp", "Am", "dAm"]: 102 dictmp = {key + "_" + str(j) :np.array([logk] + 103 list(spec_slice[key][-1::-thinning][::-1]))} 104 dic.update(dictmp) 105 106 dir_name = os.getcwd() 107 108 path = os.path.join(dir_name, path) 109 110 output_df = pd.DataFrame(dic) 111 output_df.to_csv(path) 112 113 return 114 115 def get_dim(self) -> dict: 116 """ 117 Get the number of time coordinates and momenta stored in the spectrum. 118 119 Returns 120 ------- 121 dict 122 a dictionary encoding the spectrum's shape 123 """ 124 return {"kdim":len(self["k"]), "tdim":len(self["t"])} 125 126 def tslice(self, ind : int) -> dict: 127 """ 128 Evaluate the spectrum at time `self['t'][ind]`. 129 130 Parameters 131 ---------- 132 ind : int 133 the temporal index 134 135 Returns 136 ------- 137 spec_slice : SpecSlice 138 the spectrum at time `self['t'][ind]` 139 """ 140 141 spec_slice = {} 142 for key in self.keys(): 143 if key in ["N", "t", "cut"]: 144 spec_slice[key] = self[key][ind] 145 elif key=="k": 146 spec_slice[key] = self[key] 147 else: 148 spec_slice[key] = self[key][:,ind] 149 return SpecSlice(spec_slice) 150 151 def kslice(self, ind : int) -> dict: 152 """ 153 Obtain the time evolution for the momentum `self['k'][ind]`. 154 155 Parameters 156 ---------- 157 ind : int 158 the momentum index. 159 160 Returns 161 ------- 162 spec_slice : dict 163 a dictionary with keys like `self` 164 """ 165 166 spec_slice = {} 167 for key in self.keys(): 168 if key in ["N", "t", "cut"]: 169 spec_slice[key] = self[key] 170 elif key=="k": 171 spec_slice[key] = self[key][ind] 172 else: 173 spec_slice[key] = self[key][ind,:] 174 return spec_slice 175 176 def integrate(self, BG : BGSystem, n : int=0, cutoff="kh", **IntegratorKwargs) -> np.ndarray: 177 r""" 178 Compute the three integrals $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$ for fixed $n$ and each time $t$ in the spectrum. 179 180 If the time coordinates stored in `BG` do not match those stored in the spectrum, $k_{\rm UV}(t)$ is evaluated using interpolation. 181 182 Parameters 183 ---------- 184 BG : BGSystem 185 a system containing the UV cut-off, $k_{\rm UV}(t)$ 186 n : int 187 the integer $n$ 188 cutoff : str 189 the name under which the UV-cutoff is stored in `BG` 190 **IntegratorKwargs : kwargs 191 passed to `SpecSlice.integrate_slice` 192 193 194 Returns 195 ------- 196 FMbM : NDArray 197 $\mathcal{F}_\mathcal{E}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$ stored in a shape (N, 3, 2). 198 The first index corresponds to time $t$, the second index to $\mathcal{X}$, the third index is the integral result (at 0) and its error (at 1). 199 """ 200 201 self._add_cutoff(BG, cutoff) 202 203 tdim = self.get_dim()["tdim"] 204 205 FMbM = np.zeros((tdim, 3,2)) 206 for i in range(tdim): 207 spec_slice = self.tslice(i) 208 FMbM[i,:] = spec_slice.integrate_slice(n=n, **IntegratorKwargs) 209 210 return FMbM 211 212 def estimate_GEF_error(self, BG : BGSystem, references : list[str]=["E", "B", "G"], cutoff : str="kh", 213 err_thr : float=0.025, binning : int|NoneType=5, verbose : bool=True, 214 **IntegratorKwargs) -> Tuple[list, np.ndarray, list]: 215 216 217 og_errs = self._estimate_error(BG, references, cutoff, **IntegratorKwargs) 218 terr = self["t"] 219 220 if binning is not None: 221 errs, terr = self._bin_error(og_errs, binning) 222 else: 223 errs = og_errs 224 225 errs, terr = self._process_error(errs, terr, err_thr) 226 227 if verbose: 228 self._error_summary(errs, terr, references) 229 230 return errs, terr, og_errs 231 232 def merge_spectra(self, spec : 'GaugeSpec'): 233 """ 234 Combine two spectra with the same momenta $k$ but unequal times $t$. 235 236 Parameters 237 ---------- 238 spec : GaugeSpec 239 the second spectrum 240 241 Raises 242 ------ 243 AssertionError 244 if the momenta $k$ do not match up. 245 246 """ 247 assert (spec["k"] == self["k"]).all() 248 249 ind = np.searchsorted(self["t"], spec["t"][0], "left") 250 251 if "cut" in self.keys(): 252 self.pop("cut") 253 254 for key in self.keys(): 255 if key in ["t", "N"]: 256 self[key] = np.concatenate([self[key][:ind], spec[key]]) 257 else: 258 if key != "k": 259 self[key] = np.concatenate([self[key][:,:ind], spec[key]], axis=1) 260 return 261 262 def add_momenta(self, spec : 'GaugeSpec'): 263 """ 264 Combine two spectra with the same times $t$ but unequal momenta $k$. 265 266 Parameters 267 ---------- 268 spec : GaugeSpec 269 the second spectrum 270 271 Raises 272 ------ 273 AssertionError 274 if the times $t$ do not match up. 275 276 """ 277 278 assert (np.round(spec["t"],1) == np.round(self["t"],1)).all() 279 280 newks = [] 281 mds = ["Ap", "dAp", "Am", "dAm"] 282 newmodes = dict( zip(mds, [ [] for i in mds]) ) 283 284 for i, k in enumerate(self["k"]): 285 mask = np.where(k > spec["k"])[0] 286 if len(mask) != 0: 287 newks.append(spec["k"][mask]) 288 spec["k"] = np.delete(spec["k"], mask) 289 for md in mds: 290 newmodes[md].append(spec[md][mask,:]) 291 spec[md] = np.delete(spec[md], mask, axis=0) 292 newks.append(np.array([k])) 293 for md in mds: 294 newmodes[md].append(np.array([self[md][i,:]])) 295 296 mask = np.where(spec["k"] > k)[0] 297 if len(mask) != 0: 298 newks.append(spec["k"][mask]) 299 spec["k"] = np.delete(spec["k"], mask) 300 for md in mds: 301 newmodes[md].append(spec[md][mask,:]) 302 spec[md] = np.delete(spec[md], mask, axis=0) 303 304 self["k"] = np.concatenate(newks) 305 for md in mds: 306 self[md] = np.concatenate(newmodes[md], axis=0) 307 308 return 309 310 def remove_momenta(self, ind): 311 """ 312 Remove the spectrum at momentum `self["k"][ind]`. 313 314 Parameters 315 ---------- 316 ind : int 317 the index at which to remove the spectrum entry 318 """ 319 self["k"] = np.delete(self["k"], ind) 320 for md in ["Ap", "dAp", "Am", "dAm"]: 321 self[md] = np.delete(self[md], ind, axis=0) 322 return 323 324 def _check_overlap(self, t): 325 mask = np.isin(t, self["t"], assume_unique=True) 326 if len(t[mask]) != len(self["t"]): 327 #this should be a warning 328 print("The times in the current GaugeSpec instance are " \ 329 "not a subset of the times in the BGSystem. Reverting to interpolation.") 330 331 return False, None 332 else: 333 return True, mask 334 335 def _add_cutoff(self, BG : BGSystem, cutoff="kh"): 336 units = BG.units 337 BG.units = (False) 338 339 scale = getattr(BG, cutoff) 340 341 bl, mask = self._check_overlap(BG.t) 342 343 if bl: 344 self["cut"] = scale[mask] 345 else: 346 self["cut"] = CubicSpline(BG.t, scale)(self["t"]) 347 348 BG.units = (units) 349 350 return self["cut"] 351 352 def _get_reference(self, BG : BGSystem, references=["E", "B", "G"], cutoff="kh"): 353 units = BG.units 354 BG.units = (False) 355 356 scale = getattr(BG, cutoff) 357 358 bl, mask = self._check_overlap(BG.t) 359 360 Fref = [] 361 for val in references: 362 val_arr = (getattr(BG, val)*(np.exp(BG.N)/scale)**4) 363 if bl: 364 Fref.append( val_arr[mask] ) 365 else: 366 Fref.append( CubicSpline(BG.t, val_arr)(self["t"]) ) 367 368 BG.units = (units) 369 370 return Fref 371 372 373 def _estimate_error(self, BG : BGSystem, references : list[str]=["E", "B", "G"], cutoff : str="kh", 374 **IntegratorKwargs): 375 FMbM = self.integrate(BG, n=0, cutoff=cutoff, **IntegratorKwargs) 376 Fref = self._get_reference(BG, references, cutoff) 377 378 errs = [] 379 380 for i, spl in enumerate(Fref): 381 err = np.minimum(1e3, abs( 1 - (FMbM[:,i,0]+1e-20)/(spl+1e-20) )) #avoid divide by zeros 382 errs.append(np.where(np.isnan(err), 10.0, err)) 383 384 return errs 385 386 def _process_error(self, errs, terr, err_thr): 387 removals = [] 388 for err in errs: 389 #remove the first few errors where the density of modes is low: 390 removals.append(np.where(err < err_thr)[0][0]) 391 #ind = 0 392 ind = max(removals) 393 errs = [err[ind:] for err in errs] 394 terr = terr[ind:] 395 396 return errs, terr 397 398 def _bin_error(self, errs, binning): 399 terr = self["t"] 400 401 bin_terr = terr[::-binning][::-1] 402 tbins = bin_terr[1:] + (bin_terr[1:] - bin_terr[:-1])/2 403 count, _ = np.histogram(terr, bins=tbins) 404 405 bin_errs = [] 406 for err in errs: 407 sum, _ = np.histogram(terr, bins=tbins, weights=err) 408 bin_errs.append(sum/count) 409 bin_terr = bin_terr[2:] 410 411 return bin_errs, bin_terr 412 413 @staticmethod 414 def _error_summary(bin_errs, bin_terr, references : list[str]=["E", "B", "G"]): 415 print("The mode-by-mode comparison finds the following relative deviations from the GEF solution:\n") 416 417 lst = [] 418 for err in bin_errs: 419 rmserr = 100*np.sqrt(np.sum(err**2)/len(err)) 420 finerr = 100*err[-1] 421 maxerr = 100*max(err) 422 423 tmaxerr = bin_terr[np.argmax(err)] 424 tfinerr = bin_terr[-1] 425 426 lst.append([f"{maxerr:.1f}% at {tmaxerr:.1f}", 427 f"{finerr:.1f}% at {tfinerr:.1f}", 428 f"{rmserr:.1f}%"]) 429 430 print(tabulate(lst, headers=["max", "final", "RMS"], showindex=references, tablefmt="simple")+"\n") 431 return
A class representing a spectrum of gauge-field modes as a function of time.
This class inherits from dict and needs the following keys:
't', 'N', 'k', 'Ap', 'dAp', 'Am', 'dAm'
All quantities are represented in numerical units (see geff.bgtypes.BGSystem).
The spectrum can be evaluated at certain times $t$ or for certain momenta $k$ by using tslice and kslice
Furthermore, the spectrum contained in the object can be integrated to compute gauge-field expectation values.
The result can be used to estimate the error of a GEF run.
Attributes
- t (NDArray): the cosmic time coordinates $t$ of the spectrum
- N (NDArray): the $e$-folds as a function of cosmic time, $N(t)$
- k (NDarray): the momenta $k$ at which the spectrum is evaluated
- Ap, Am (NDarray): the mode functions, $\sqrt{2 k} A_\pm(k, t)$
- dAp, dAm (NDarray): the mode-function derivatives, $\sqrt{2/k} \, e^{N(t)}\dot{A}_\pm(k, t)$
17 def __init__(self, in_dict : dict): 18 """ 19 Initialise the spectrum from a dictionary. 20 21 Parameters 22 ---------- 23 in_dict : dict 24 dictionary with keys `'t'`, `'N'`, `'k'`, `'Ap'`, `'dAp'`, `'Am'`, `'dAm'` 25 26 Raises 27 ------ 28 KeyError 29 if one of the necessary keys is missing. 30 ValueError 31 if `len(in_dict['t']) != len(in_dict['N'])` or if 32 `in_dict['X']).shape != (len(in_dict['k']),len(in_dict['t']))` for `'X'` in `['Ap', 'dAp', 'Am', 'dAm']`. 33 """ 34 for key in ["t", "N", "k", "Ap", "dAp", "Am", "dAm"]: 35 if key not in in_dict.keys(): 36 raise KeyError(f"Missing key: {key}") 37 38 if not(len(in_dict["t"]) == len(in_dict["N"])): 39 raise ValueError("The length of 't' needs to match the length of 'N'") 40 41 shape = (len(in_dict["k"]), len(in_dict["t"])) 42 for key in ["Ap", "dAp", "Am", "dAm"]: 43 if not( in_dict[key].shape == shape): 44 raise ValueError(f"The shape of {key} needs to be {shape}") 45 46 super().__init__(in_dict)
Initialise the spectrum from a dictionary.
Parameters
- in_dict (dict):
dictionary with keys
't','N','k','Ap','dAp','Am','dAm'
Raises
- KeyError: if one of the necessary keys is missing.
- ValueError: if
len(in_dict['t']) != len(in_dict['N'])or ifin_dict['X']).shape != (len(in_dict['k']),len(in_dict['t']))for'X'in['Ap', 'dAp', 'Am', 'dAm'].
48 @classmethod 49 def read_spec(cls, path : str): 50 """ 51 Initialise the class from a file. 52 53 Parameters 54 ---------- 55 path : str 56 path to the data 57 58 Returns 59 ------- 60 spec : GaugeSpec 61 the imported spectrum 62 """ 63 input_df = pd.read_table(path, sep=",") 64 dataAp = input_df.values 65 66 x = np.arange(3,dataAp.shape[1], 4) 67 68 t = np.asarray(dataAp[1:,1], dtype=float) 69 N = np.asarray(dataAp[1:,2], dtype=float) 70 logk = np.array([(complex(dataAp[0,y])).real for y in x]) 71 Ap = np.array([[complex(dataAp[i+1,y]) for i in range(len(N))] for y in x]) 72 dAp = np.array([[complex(dataAp[i+1,y+1]) for i in range(len(N))] for y in x]) 73 Am = np.array([[complex(dataAp[i+1,y+2]) for i in range(len(N))] for y in x]) 74 dAm = np.array([[complex(dataAp[i+1,y+3]) for i in range(len(N))] for y in x]) 75 76 k = 10**logk 77 78 spec = cls({"t":t, "N":N, "k":k, 79 "Ap":Ap, "dAp":dAp, "Am":Am, "dAm":dAm}) 80 81 return spec
Initialise the class from a file.
Parameters
- path (str): path to the data
Returns
- spec (GaugeSpec): the imported spectrum
83 def save_spec(self, path : str): 84 """ 85 Store the spectrum in a file. 86 87 Parameters 88 ---------- 89 path : str 90 path to the data 91 """ 92 thinning = 1 93 N = np.array([np.nan]+list(self["N"][-1::-thinning][::-1])) 94 t = np.array([np.nan]+list(self["t"][-1::-thinning][::-1])) 95 96 dic = {"t":t, "N":N} 97 98 for j, k in enumerate(self["k"]): 99 spec_slice = self.kslice(j) 100 logk = np.log10(spec_slice["k"]) 101 for key in ["Ap", "dAp", "Am", "dAm"]: 102 dictmp = {key + "_" + str(j) :np.array([logk] + 103 list(spec_slice[key][-1::-thinning][::-1]))} 104 dic.update(dictmp) 105 106 dir_name = os.getcwd() 107 108 path = os.path.join(dir_name, path) 109 110 output_df = pd.DataFrame(dic) 111 output_df.to_csv(path) 112 113 return
Store the spectrum in a file.
Parameters
- path (str): path to the data
115 def get_dim(self) -> dict: 116 """ 117 Get the number of time coordinates and momenta stored in the spectrum. 118 119 Returns 120 ------- 121 dict 122 a dictionary encoding the spectrum's shape 123 """ 124 return {"kdim":len(self["k"]), "tdim":len(self["t"])}
Get the number of time coordinates and momenta stored in the spectrum.
Returns
- dict: a dictionary encoding the spectrum's shape
126 def tslice(self, ind : int) -> dict: 127 """ 128 Evaluate the spectrum at time `self['t'][ind]`. 129 130 Parameters 131 ---------- 132 ind : int 133 the temporal index 134 135 Returns 136 ------- 137 spec_slice : SpecSlice 138 the spectrum at time `self['t'][ind]` 139 """ 140 141 spec_slice = {} 142 for key in self.keys(): 143 if key in ["N", "t", "cut"]: 144 spec_slice[key] = self[key][ind] 145 elif key=="k": 146 spec_slice[key] = self[key] 147 else: 148 spec_slice[key] = self[key][:,ind] 149 return SpecSlice(spec_slice)
Evaluate the spectrum at time self['t'][ind].
Parameters
- ind (int): the temporal index
Returns
- spec_slice (SpecSlice):
the spectrum at time
self['t'][ind]
151 def kslice(self, ind : int) -> dict: 152 """ 153 Obtain the time evolution for the momentum `self['k'][ind]`. 154 155 Parameters 156 ---------- 157 ind : int 158 the momentum index. 159 160 Returns 161 ------- 162 spec_slice : dict 163 a dictionary with keys like `self` 164 """ 165 166 spec_slice = {} 167 for key in self.keys(): 168 if key in ["N", "t", "cut"]: 169 spec_slice[key] = self[key] 170 elif key=="k": 171 spec_slice[key] = self[key][ind] 172 else: 173 spec_slice[key] = self[key][ind,:] 174 return spec_slice
Obtain the time evolution for the momentum self['k'][ind].
Parameters
- ind (int): the momentum index.
Returns
- spec_slice (dict):
a dictionary with keys like
self
176 def integrate(self, BG : BGSystem, n : int=0, cutoff="kh", **IntegratorKwargs) -> np.ndarray: 177 r""" 178 Compute the three integrals $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$ for fixed $n$ and each time $t$ in the spectrum. 179 180 If the time coordinates stored in `BG` do not match those stored in the spectrum, $k_{\rm UV}(t)$ is evaluated using interpolation. 181 182 Parameters 183 ---------- 184 BG : BGSystem 185 a system containing the UV cut-off, $k_{\rm UV}(t)$ 186 n : int 187 the integer $n$ 188 cutoff : str 189 the name under which the UV-cutoff is stored in `BG` 190 **IntegratorKwargs : kwargs 191 passed to `SpecSlice.integrate_slice` 192 193 194 Returns 195 ------- 196 FMbM : NDArray 197 $\mathcal{F}_\mathcal{E}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$ stored in a shape (N, 3, 2). 198 The first index corresponds to time $t$, the second index to $\mathcal{X}$, the third index is the integral result (at 0) and its error (at 1). 199 """ 200 201 self._add_cutoff(BG, cutoff) 202 203 tdim = self.get_dim()["tdim"] 204 205 FMbM = np.zeros((tdim, 3,2)) 206 for i in range(tdim): 207 spec_slice = self.tslice(i) 208 FMbM[i,:] = spec_slice.integrate_slice(n=n, **IntegratorKwargs) 209 210 return FMbM
Compute the three integrals $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$ for fixed $n$ and each time $t$ in the spectrum.
If the time coordinates stored in BG do not match those stored in the spectrum, $k_{\rm UV}(t)$ is evaluated using interpolation.
Parameters
- BG (BGSystem): a system containing the UV cut-off, $k_{\rm UV}(t)$
- n (int): the integer $n$
- cutoff (str):
the name under which the UV-cutoff is stored in
BG - **IntegratorKwargs (kwargs):
passed to
SpecSlice.integrate_slice
Returns
- FMbM (NDArray): $\mathcal{F}_\mathcal{E}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$ stored in a shape (N, 3, 2). The first index corresponds to time $t$, the second index to $\mathcal{X}$, the third index is the integral result (at 0) and its error (at 1).
212 def estimate_GEF_error(self, BG : BGSystem, references : list[str]=["E", "B", "G"], cutoff : str="kh", 213 err_thr : float=0.025, binning : int|NoneType=5, verbose : bool=True, 214 **IntegratorKwargs) -> Tuple[list, np.ndarray, list]: 215 216 217 og_errs = self._estimate_error(BG, references, cutoff, **IntegratorKwargs) 218 terr = self["t"] 219 220 if binning is not None: 221 errs, terr = self._bin_error(og_errs, binning) 222 else: 223 errs = og_errs 224 225 errs, terr = self._process_error(errs, terr, err_thr) 226 227 if verbose: 228 self._error_summary(errs, terr, references) 229 230 return errs, terr, og_errs
Estimate the relative deviation between a GEF solution and the mode spectrum by computing
$$\varepsilon_\mathcal{X} = \left|1 - \frac{\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm MbM}}{\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm GEF}}\right|$$
for $\mathcal{X} = \mathcal{E},\,\mathcal{B},\,\mathcal{G}$. Here, $\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm MbM}$ are the integrals computed by integrate,
$\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm GEF}$ refer to the same quantity in BG.
If the time coordinate of BG does not align with the spectrum, its values are interpolated.
Because $k_{\rm UV}(t)$ increases monotonically, the spectrum contains only few relevant modes $k < k_{\rm UV}(t)$ at early times. This poses a problem for the numerical integration of $\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm MbM}$. To avoid claiming a disagreement between $\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm MbM}$ and $\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm GEF}$ due to this effect, errors with $\varepsilon_\mathcal{X} > \varepsilon_{\rm thr}$ are discarded until the first time when $\varepsilon_\mathcal{X} < \varepsilon_{\rm thr}$.
As the integration result fluctuates significantly for few momenta $k < k_{\rm UV}(t)$ when using simpson,
the errors can be binned by setting binning. The reported error is the average over a bin of width $(t_{i}, t_{i+\Delta})$ with $\Delta$ set by binning.
This binned error is then associated to the time $(t_{i} + t_{i+\Delta})/2$. For quad, binning can also be set to None.
For details on the integration methods simpson and quad, see SpecSlice.integrate_slice.
Parameters
- BG (BGSystem): the system where $\big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm GEF}$ are stored
- references (list of str):
the names where $(k_{\rm UV}/a)^4 \big(\mathcal{F}_\mathcal{X}^{(0)}\big)_{\rm GEF}$ are stored in
BG - cutoff (str):
the name where the UV-cutoff, $k_{\rm UV}$, is stored in
BG - err_thr (float): the error threshold $\varepsilon_{\rm thr}$
- binning (int or None):
the bin size $\Delta$ (no binning if
None) - verbose (bool):
if
True, print a summary of the errors - **IntegratorKwargs (kwargs):
passed to
SpecSlice.integrate_slice
Returns
- errs (list of NDArray): a list of the binned errors with entries $[\varepsilon_\mathcal{E},\varepsilon_\mathcal{B}, \varepsilon_\mathcal{G}]$
- terr (NDArray):
the time coordinates corresponding to
errs - og_errs (list of NDArray):
the same as
errsbut without binning
232 def merge_spectra(self, spec : 'GaugeSpec'): 233 """ 234 Combine two spectra with the same momenta $k$ but unequal times $t$. 235 236 Parameters 237 ---------- 238 spec : GaugeSpec 239 the second spectrum 240 241 Raises 242 ------ 243 AssertionError 244 if the momenta $k$ do not match up. 245 246 """ 247 assert (spec["k"] == self["k"]).all() 248 249 ind = np.searchsorted(self["t"], spec["t"][0], "left") 250 251 if "cut" in self.keys(): 252 self.pop("cut") 253 254 for key in self.keys(): 255 if key in ["t", "N"]: 256 self[key] = np.concatenate([self[key][:ind], spec[key]]) 257 else: 258 if key != "k": 259 self[key] = np.concatenate([self[key][:,:ind], spec[key]], axis=1) 260 return
Combine two spectra with the same momenta $k$ but unequal times $t$.
Parameters
- spec (GaugeSpec): the second spectrum
Raises
- AssertionError: if the momenta $k$ do not match up.
262 def add_momenta(self, spec : 'GaugeSpec'): 263 """ 264 Combine two spectra with the same times $t$ but unequal momenta $k$. 265 266 Parameters 267 ---------- 268 spec : GaugeSpec 269 the second spectrum 270 271 Raises 272 ------ 273 AssertionError 274 if the times $t$ do not match up. 275 276 """ 277 278 assert (np.round(spec["t"],1) == np.round(self["t"],1)).all() 279 280 newks = [] 281 mds = ["Ap", "dAp", "Am", "dAm"] 282 newmodes = dict( zip(mds, [ [] for i in mds]) ) 283 284 for i, k in enumerate(self["k"]): 285 mask = np.where(k > spec["k"])[0] 286 if len(mask) != 0: 287 newks.append(spec["k"][mask]) 288 spec["k"] = np.delete(spec["k"], mask) 289 for md in mds: 290 newmodes[md].append(spec[md][mask,:]) 291 spec[md] = np.delete(spec[md], mask, axis=0) 292 newks.append(np.array([k])) 293 for md in mds: 294 newmodes[md].append(np.array([self[md][i,:]])) 295 296 mask = np.where(spec["k"] > k)[0] 297 if len(mask) != 0: 298 newks.append(spec["k"][mask]) 299 spec["k"] = np.delete(spec["k"], mask) 300 for md in mds: 301 newmodes[md].append(spec[md][mask,:]) 302 spec[md] = np.delete(spec[md], mask, axis=0) 303 304 self["k"] = np.concatenate(newks) 305 for md in mds: 306 self[md] = np.concatenate(newmodes[md], axis=0) 307 308 return
Combine two spectra with the same times $t$ but unequal momenta $k$.
Parameters
- spec (GaugeSpec): the second spectrum
Raises
- AssertionError: if the times $t$ do not match up.
310 def remove_momenta(self, ind): 311 """ 312 Remove the spectrum at momentum `self["k"][ind]`. 313 314 Parameters 315 ---------- 316 ind : int 317 the index at which to remove the spectrum entry 318 """ 319 self["k"] = np.delete(self["k"], ind) 320 for md in ["Ap", "dAp", "Am", "dAm"]: 321 self[md] = np.delete(self[md], ind, axis=0) 322 return
Remove the spectrum at momentum self["k"][ind].
Parameters
- ind (int): the index at which to remove the spectrum entry
435class SpecSlice(dict): 436 437 def __init__(self, in_dict): 438 super().__init__(in_dict) 439 440 def integrate_slice(self, n : int=0, integrator="simpson", modethr : int=100, 441 epsabs : float=1e-20, epsrel : float=1e-4, interpolator=PchipInterpolator 442 )-> Tuple[np.ndarray, np.ndarray]: 443 x = (n+4)*np.log(self["k"]/self["cut"]) 444 445 helicities = ["p", "m"] 446 integs = np.zeros((3, 2, len(x))) 447 for i, lam in enumerate(helicities): 448 sgn = np.sign(0.5-i) 449 integs[0,i,:] = self._Espec(lam) 450 integs[1,i,:] = self._Bspec(lam) 451 integs[2,i,:] = sgn*self._Gspec(lam) 452 453 res = np.zeros((3,2)) 454 455 for i in range(3): 456 if integrator=="simpson": 457 msk = np.where(x < 0)[0] 458 if len(msk) < modethr: #cannot trust simpsons integration for too few modes. 459 return res 460 x = x[msk] 461 res[i,0] = (self._simpson_integrate( integs[i,0,msk] ,x) 462 + (-1)**n*self._simpson_integrate(integs[i,1,msk], x) ) 463 res[i,1] = 1e-6*res[i,0] 464 465 elif integrator=="quad": 466 resp = self._quad_integrate( integs[i,0,:], x, epsabs, epsrel, interpolator) 467 resm = self._quad_integrate( (-1)**n*integs[i,1,:], x, epsabs, epsrel, interpolator) 468 res[i,:] = resp +resm 469 470 res = 1/(2*np.pi)**2*res/(n+4) 471 472 res[:,1] = abs(res[:,1]/res[:,0]) 473 return res 474 475 def _Espec(self, lam): 476 return abs(self["dA"+lam])**2 477 478 def _Bspec(self, lam): 479 return abs(self["A"+lam])**2 480 481 def _Gspec(self, lam): 482 return (self["A"+lam].conjugate()*self["dA"+lam]).real 483 484 def _simpson_integrate(self, integrand, x): 485 integrand = integrand*np.exp(x) 486 return simpson(integrand, x) 487 488 def _quad_integrate(self, integrand, x, epsabs : float=1e-20, epsrel : float=1e-4, interpolator=PchipInterpolator): 489 msk = np.where(abs(integrand) > 1e-1*max(epsrel*max(abs(integrand)), epsabs))[0] 490 if len(msk) > 0: 491 spl = interpolator(x, np.arcsinh(integrand)) 492 def f(x): return np.sinh(spl(x))*np.exp(x) 493 val, err = quad(f, x[msk][0], 0., epsabs=epsabs, epsrel=epsrel) 494 return np.array([val, err]) 495 else: 496 return np.nan*np.ones((2))
A class representing a spectrum of gauge-field modes at a time $t$.
Instances of this class are created by GaugeSpec.tslice. The main purpose of this class is to integrate the spectrum at time $t$ using integrate_slice.
Attributes
- t (NDArray): the cosmic time coordinates $t$ of the spectrum
- N (NDArray): the $e$-folds as a function of cosmic time, $N(t)$
- k (NDarray): the momenta $k$ at which the spectrum is evaluated
- Ap, Am (NDarray): the mode functions, $\sqrt{2 k} A_\pm(k, t)$
- dAp, dAm (NDarray): the mode-function derivatives, $\sqrt{2/k} \, e^{N(t)}\dot{A}_\pm(k, t)$
440 def integrate_slice(self, n : int=0, integrator="simpson", modethr : int=100, 441 epsabs : float=1e-20, epsrel : float=1e-4, interpolator=PchipInterpolator 442 )-> Tuple[np.ndarray, np.ndarray]: 443 x = (n+4)*np.log(self["k"]/self["cut"]) 444 445 helicities = ["p", "m"] 446 integs = np.zeros((3, 2, len(x))) 447 for i, lam in enumerate(helicities): 448 sgn = np.sign(0.5-i) 449 integs[0,i,:] = self._Espec(lam) 450 integs[1,i,:] = self._Bspec(lam) 451 integs[2,i,:] = sgn*self._Gspec(lam) 452 453 res = np.zeros((3,2)) 454 455 for i in range(3): 456 if integrator=="simpson": 457 msk = np.where(x < 0)[0] 458 if len(msk) < modethr: #cannot trust simpsons integration for too few modes. 459 return res 460 x = x[msk] 461 res[i,0] = (self._simpson_integrate( integs[i,0,msk] ,x) 462 + (-1)**n*self._simpson_integrate(integs[i,1,msk], x) ) 463 res[i,1] = 1e-6*res[i,0] 464 465 elif integrator=="quad": 466 resp = self._quad_integrate( integs[i,0,:], x, epsabs, epsrel, interpolator) 467 resm = self._quad_integrate( (-1)**n*integs[i,1,:], x, epsabs, epsrel, interpolator) 468 res[i,:] = resp +resm 469 470 res = 1/(2*np.pi)**2*res/(n+4) 471 472 res[:,1] = abs(res[:,1]/res[:,0]) 473 return res
Compute the three integrals $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$ for a fixed time $t$ and index $n$.
The integrals can either be computed directly using simpson or quad from scipy.interpolate. When using quad the data for $\sqrt{2 k} A_\pm(k, t)$, $\sqrt{2/k} \, e^{N(t)}\dot{A}_\pm(k, t)$
are interpolated to obtain smooth functions. To avoid this, it is recommended to use simpson.
When using simpson, the integral is only computed if $m > m_{\rm thr}$ momenta $k_i$ satisfy $k < k_{\rm UV}$. Otherwise, the integral is set to zero.
When using quad, the absolute and relative tolerances of the integrator are set by epsabs and epsrel. The interpolation method is defined by interpolator.
Currently, only CubicSpline and PchipInterpolator from scipy.interpolate are supported. The later is preferred as interpolating the oscillatory mode functions can be subject to "overshooting".
See scipy's tutorial for 1-D interpolation for more details.
Parameters
- n (int): the integer $n$ in $\mathcal{F}_\mathcal{X}^{(n)}(t)$ for $\mathcal{X} = \mathcal{E}, \mathcal{B},\mathcal{G}$
- integrator (str):
set the integration method to
simpsonorquad - modethr (int):
set $m_{\rm thr}$ when using
simpson - epsabs (float):
the absolute tolerance of
quad - epsrel (float):
the relative tolerance of
quad - interpolator: the interpolator used to get smooth functions for
quad
Returns
- FMbM (NDArray):
contains [$\mathcal{F}_\mathcal{E}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$, $\mathcal{F}_\mathcal{B}^{(n)}(t)$] and the error estimated by
quad. When usingsimpsonthe error is a dummy output. The shape of the result is (3, 2) with the second index indicating the integral (at 0), or the error (at 1).
498class BaseModeSolver: 499 _ode_dict : dict = {"a": "a", "H":"H", "xi":"xi"} 500 _bd_dict = {} 501 502 cutoff : ClassVar[str] = "kh" 503 r"""The name of $k_{\rm UV}(t)$ in the GEF solution.""" 504 505 atol : ClassVar[float] = 1e-3 506 """The default absolute tolerance used in `scipy.integrate.solve_ivp`""" 507 508 necessary_keys : ClassVar[set] = (["t", "N", "a", "H", "xi"] + [cutoff]) 509 """The class expects these attributes in the `.bgtypes.BGSystem` passed on initialisation.""" 510 511 mode_equation = staticmethod(mode_equation_classic) 512 initialise_in_bd = staticmethod(bd_classic) 513 514 def __init__(self, sys : BGSystem): 515 """ 516 Import the evolution of the background dynamics to configure the solver. 517 518 All values in the BGSystem are treated in numerical units. 519 520 Parameters 521 ---------- 522 sys : BGSystem 523 describes the background evolution 524 525 Raises 526 ------ 527 KeyError: 528 if `sys` is missing a `Val` or `Func` object from `necessary_keys` 529 ValueError: 530 if the keys in `necessary_keys` are not `Val` or `Func` objects. 531 """ 532 og_units = sys.units 533 sys.units = False 534 535 #Check that all necessary keys are there: 536 for key in self.necessary_keys: 537 check = True 538 check = ( (key in sys.variable_names()) 539 or 540 (key in sys.constant_names()) 541 or 542 (key in sys.function_names()) 543 ) 544 if not(check): 545 raise KeyError(f"'sys' needs to own an attribute called '{key}'.") 546 547 #store the relevant background evolution parameters 548 self._t = sys.t.value 549 self._N = sys.N.value 550 kh = getattr(sys, self.cutoff) 551 a = np.exp(self._N) 552 553 self._khf = CubicSpline(self._t, kh) 554 555 self._kwargs_ode = {kwarg:None for kwarg in self._ode_dict.values()} 556 self._kwargs_bd = {kwarg:None for kwarg in self._bd_dict.values()} 557 558 #import the values for the mode equation and interpolate 559 for key in self.necessary_keys: 560 obj = getattr(sys, key) 561 if isinstance(obj, Variable): 562 value = getattr(sys, key).value 563 func = CubicSpline(self._t, value) 564 elif isinstance(obj, Func): 565 arg_vals = [] 566 for arg in obj.args: 567 arg_vals.append(getattr(sys, arg.name)) 568 value = obj(*arg_vals) 569 func = CubicSpline(self._t, value) 570 elif isinstance(obj, Constant): 571 value = getattr(sys, key).value 572 func = CubicSpline(self._t, value*np.ones_like(self._t)) 573 else: 574 raise ValueError(f"'{key}' should refer to either a 'Val' or 'Func' subclass.") 575 576 if key in self._ode_dict.keys(): 577 kwarg = self._ode_dict[key] 578 self._kwargs_ode[kwarg] = func 579 if key in self._bd_dict.values(): 580 kwarg = self._bd_dict[key] 581 self._kwargs_bd[kwarg] = func 582 583 #compute the evolution of conformal time for the phases 584 af = CubicSpline(self._t, a) 585 def deta(t, y): return 1/af(t) 586 587 soleta = solve_ivp(deta, [min(self._t), max(self._t)], np.array([0]), t_eval=self._t) 588 589 self.__eta = soleta.y[0,:] 590 591 #find lowest t value corresponding to kh(t) = 10^4 kh(0) 592 self.__tmin = self._t[np.searchsorted(kh, 10**4*kh[0], "right")] 593 594 sys.units = og_units 595 596 return 597 598 def compute_spectrum(self, nvals : int, t_interval : tuple|NoneType=None, **SolverKwargs) -> GaugeSpec: 599 r""" 600 Evolve a gauge-field spectrum from Bunch-Davies initial conditions. 601 602 Evolve the mode functions $A_\lambda(t,k)$ and its derivative in time for $n$ modes between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. 603 The $n$ evolved modes are more densly spaced when $\log k_{\rm UV}(t)$ increases more slowly to ensure a higher density of modes 604 crossing the horizon when backreaction effects are relevant. 605 606 Parameters 607 ---------- 608 nvals : int 609 The number of modes $n$ 610 t_interval : tuple or None 611 set $t_{\rm min}$ and $t_{\rm max}$. If None, $t_{\rm min}$ is given by $10^4 k_{\rm UV}(t_{min}) = k_{\rm UV}(0)$ and $t_{\rm max} = \max t$. 612 **SolverKwargs : kwargs 613 tolerance parameters`atol` and `rtol` passed to `solve_ivp` (default: `atol=self.atol`, `rtol=1e-5`) 614 615 Returns 616 ------- 617 spec : GaugeSpec 618 the gauge-field spectrum 619 """ 620 621 if t_interval is None: 622 t_interval = (self.__tmin, max(self._t)) 623 ks, tstart = self._find_tinit_bd(self._create_k_array(nvals, t_interval), mode="k") 624 625 modes = np.array([self._evolve_from_bd(k, tstart[i], **SolverKwargs) 626 for i, k in enumerate(ks)]) 627 628 spec = GaugeSpec({"t":self._t, "N":self._N, "k":ks, 629 "Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 630 631 return spec 632 633 def update_spectrum(self, spec : GaugeSpec, tstart : float, **SolverKwargs) -> GaugeSpec: 634 r""" 635 Update an existing gauge-field spectrum starting from $t_{\rm start}$ 636 637 Starting from the modes stored in `GaugeSpec`, the function re-evaluates the evolution starting from $t_{\rm start}$. 638 Additional gauge-field modes are evolved starting from Bunch–Davies to account for new modes crossing the horizon 639 at times beyond the original range covered by the input spectrum. 640 641 Parameters 642 ---------- 643 spec : GaugeSpec 644 the spectrum which is to be updated 645 tstart : float 646 the starting time $t_{\rm start}$ 647 **SolverKwargs : kwargs 648 as in `compute_spectrum` 649 650 Returns 651 ------- 652 spec : GaugeSpec 653 the updated gauge-field spectrum 654 """ 655 656 indstart = np.searchsorted(self._t, tstart, "left") 657 teval = self._t[indstart:] 658 Neval = self._N[indstart:] 659 660 tend = teval[-1] 661 indstart = np.searchsorted(spec["t"], teval[0], "left") 662 startspec = spec.tslice(indstart) 663 tstart = startspec["t"] 664 665 #keep mode-evolution from old spectrum for modes with k < 10*kh(tstart) 666 old = np.where(spec["k"] < 10*self._khf(tstart)) 667 new = np.where(spec["k"] >= 10*self._khf(tstart)) 668 669 #Remove modes which need to be renewed from old spectrum: 670 spec.remove_momenta(new) 671 672 #add new modes, adjusting for longer time-span: 673 #rethink this! It seems like overkill 674 n_newmodes = int(len(new)*max((teval[-1] - teval[0])/(max(spec["t"]) - tstart), 1)) 675 676 #Update evolution of modes in spec: 677 kold, tvac = self._find_tinit_bd(spec["k"][old], mode="k") 678 679 updatespec={"t":teval, "N":Neval, "k":kold} 680 681 modes = [] 682 for i, k in enumerate(kold): 683 if tvac[i] > teval[0]: 684 modes.append( self._evolve_from_bd(k, tvac[i], **SolverKwargs) ) 685 else: 686 yini = np.array( 687 [startspec["Ap"][i].real, startspec["dAp"][i].real, 688 startspec["Ap"][i].imag, startspec["dAp"][i].imag, 689 startspec["Am"][i].real, startspec["dAm"][i].real, 690 startspec["Am"][i].imag, startspec["dAm"][i].imag] 691 ) 692 693 modes.append( self._evolve_mode(tstart, yini, k, teval, **SolverKwargs) ) 694 695 modes = np.array(modes) 696 697 updatespec.update({"Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 698 699 spec.merge_spectra(GaugeSpec(updatespec)) 700 701 if n_newmodes > 0: 702 newspec = self.compute_spectrum(n_newmodes, t_interval=(tstart, tend)) 703 #Add new modes 704 spec.add_momenta(newspec) 705 706 return spec 707 708 """ 709 def EvolveSpectrum(self, spec : GaugeSpec, Nstart, Nstep : float=0.1, atol : float|None=None, rtol : float=1e-5) -> GaugeSpec: 710 Neval = np.arange(Nstart, max(self.__N), Nstep) 711 teval = CubicSpline(self.__N, self.__t)(Neval) 712 713 indstart = np.where(spec["N"]<Nstart)[0][-1] 714 startspec = spec.tslice(indstart) 715 716 klen = len(spec["k"]) 717 718 vecode = np.vectorize(lambda t, y, k: self.mode_equation(t, y, k, **self._ode_kwargs), 719 excluded={0, "t"}, 720 signature="(8,n),(n)->(8,n)", 721 ) 722 def ode(t, y): 723 #(k,8) to reshape correctly 724 #transposing to match signature of vecode 725 y = y.reshape(klen,8).T 726 #transposing result s.t. dydt.shape=(k,8) 727 dydt = vecode(t, y, spec["k"]).T 728 #reshape is correct again 729 return dydt.reshape(8*klen) 730 731 spec.merge_spectra(GaugeSpec(newspec)) 732 733 return spec 734 """ 735 736 737 def _evolve_from_bd(self, k : float, tstart : float, 738 atol : float|None=None, rtol : float=1e-5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 739 """ 740 Evolve a mode with momentum $k$ starting from Bunch-Davies initial conditions. 741 742 Parameters 743 ---------- 744 k : float 745 the momentum $k$ 746 tstart : float 747 the corresponding initialisation time, $t_{\rm init}$ 748 atol : float or None 749 `atol` used by `solve_ivp` if None, use `self.atol` 750 rtol : float 751 `rtol` used by `solve_ivp` 752 753 Returns 754 ------- 755 yp : NDArray 756 the positive helicity mode 757 dyp : NDArray 758 the derivative of the positive helicity mode 759 ym : NDArray 760 the positive helicity mode 761 dym : NDArray 762 the derivative of the negative helicity mode 763 """ 764 765 #Initial conditions for y and dydt for both helicities (rescaled appropriately) 766 yini = self.initialise_in_bd(tstart, k, **self._kwargs_bd) 767 768 teval = self._t 769 770 istart = np.searchsorted(teval, tstart, "right") 771 772 yp, dyp, ym, dym = self._evolve_mode(tstart, yini, k, teval[istart:], atol, rtol) 773 774 #conformal time needed for relative phases 775 eta = self.__eta 776 777 #the mode was in vacuum before tstart 778 yvac = np.array([self.initialise_in_bd(t, k, **self._kwargs_bd) for t in teval[:istart]]).T 779 phasevac = (np.exp(-1j*k*eta[:istart])) 780 vac = yvac * phasevac 781 782 #Create array of mode evolution stringing together vacuum and non-vacuum time evolutions to get evolution from t0 to tend 783 yp = np.concatenate([(vac[0,:] + 1j*vac[2,:]), yp*np.exp(-1j*k*eta[istart])]) 784 dyp = np.concatenate([(vac[1,:] + 1j*vac[3,:]), dyp*np.exp(-1j*k*eta[istart])]) 785 ym = np.concatenate([(vac[4,:] + 1j*vac[6,:]), ym*np.exp(-1j*k*eta[istart])]) 786 dym = np.concatenate([(vac[5,:] + 1j*vac[7,:]), dym*np.exp(-1j*k*eta[istart])]) 787 788 return yp, dyp, ym, dym 789 790 def _evolve_mode(self, tini : float, yini : np.ndarray, k : float, teval : np.ndarray, 791 atol : float|None=None, rtol : float=1e-5): 792 """ 793 Evolve a mode of momentum $k$ from $t_{\rm ini}$. 794 795 Parameters 796 ---------- 797 tini : float 798 the initial time $t_{\rm ini}$ 799 yini : NDArray 800 (8,) array containing the initial data 801 k : float 802 the momentum $k$ 803 teval : NDArray 804 the times at which the returned solution is evaluated 805 atol : float or None 806 `atol` used by `solve_ivp` if None, use `self.atol` 807 rtol : float 808 `rtol` used by `solve_ivp` 809 810 Returns 811 ------- 812 yp : NDArray 813 the positive helicity mode 814 dyp : NDArray 815 the derivative of the positive helicity mode 816 ym : NDArray 817 the positive helicity mode 818 dym : NDArray 819 the derivative of the negative helicity mode 820 """ 821 #Define ODE 822 def ode(t, y): return self.mode_equation(t, y, k, **self._kwargs_ode) 823 824 if atol is None: 825 atol = self.atol 826 827 #Solve differential equation from tstart to tmax 828 sol = solve_ivp(ode, [tini, max(teval)], yini, t_eval=teval, method="RK45", atol=atol, rtol=rtol) 829 830 yp = (sol.y[0,:] + 1j*sol.y[2,:]) 831 dyp = (sol.y[1,:] + 1j*sol.y[3,:]) 832 ym = (sol.y[4,:] + 1j*sol.y[6,:]) 833 dym = (sol.y[5,:] + 1j*sol.y[7,:]) 834 835 return yp, dyp, ym, dym 836 837 838 def _create_k_array(self, nvals : int, t_interval : tuple) -> np.ndarray: 839 r""" 840 Create an array of $n$ momenta 841 842 The $n$ modes are generated between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. 843 First, $m$ of modes $k = k_{\rm UV}$ with t evenly spaced between $t_{\rm min}$ and $t_{\rm max}$ are generated. 844 As $k_{\rm UV}$ is monotonic but not strictly monotonic, $m\leq n$. To fill up to $n$ modes, $n-m$ additional modes are 845 created between the existing times $(t_{i},t_{i+1})$ moving backwards from $t_{\rm max}$ to favour larger momenta. 846 847 Parameters 848 ---------- 849 nvals : int 850 The size of the output array 851 t_interval : tuple 852 sets $t_{\rm min}$ and $t_{\rm max}$ 853 854 Returns 855 ------- 856 NDArray 857 an array of momenta with size nvals 858 """ 859 860 #create an array of values log(10*kh(t)) 861 logks = np.round( np.log(10*self._khf(np.linspace(t_interval[0], t_interval[1], nvals))), 3) 862 #filter out all values that are repeating (kh is not strictly monotonous) 863 logks = np.unique((logks)) 864 865 #fill up the array of ks values with additional elements between gaps, favouring larger k 866 while len(logks) < nvals: 867 num_newk = nvals - len(logks) 868 if num_newk >= len(logks): 869 newvals = (logks[1:] + logks[:-1])/2 870 else: 871 newvals = (logks[-num_newk:] + logks[-num_newk-1:-1])/2 872 logks = np.sort(np.concatenate([logks, newvals])) 873 return np.exp(logks) 874 875 def _find_tinit_bd(self, init : np.ndarray, mode : str="k") -> Tuple[np.ndarray, np.ndarray]: 876 """ 877 Determines the pair of $k$ and $t$ satisfying $k = 10^(5/2)k_h(t)$. 878 879 Depending on `mode`, `init` may be a time coordinate (`mode='t'`), $e$-folds (`mode='N'`) or momentum (`mode='k'`). 880 881 Parameters 882 ---------- 883 init : array 884 the input array (t, N, or k) 885 mode : str 886 indicate the type of `init` 887 888 Returns 889 ------- 890 ks : NDarray 891 an array of momenta 892 tstart : NDarray 893 an array of times 894 895 Raises 896 ------ 897 KeyError 898 if `mode` is not 't, 'k' or 'N' 899 """ 900 901 if mode=="t": 902 tstart = init 903 ks = 10**(5/2)*self._khf(tstart) 904 905 elif mode=="k": 906 ks = init 907 908 tstart = [] 909 for k in ks: 910 ttmp = self._t[np.searchsorted(10**(5/2)*self._khf(self._t), k, "right")] 911 tstart.append(ttmp) 912 tstart = np.array(tstart) 913 914 elif mode=="N": 915 tstart = CubicSpline(self._N, self._t)(init) 916 ks = 10**(5/2)*self._khf(tstart) 917 918 else: 919 raise KeyError("'mode' must be 't', 'k' or 'N'") 920 921 return ks, tstart
A class used to compute gauge-field modes evolving on a time-dependent background.
This class is used to evolve the gauge-field modes $A_\pm(t,k)$ as defined by mode_equation.
To do so, the evolution of the time-dependent background is obtained from a GEF solution.
The modes are initialized deep inside the Bunch–Davies vacuum as given by initialise_in_bd.
Numerically, the initialization time is implicitly defined by the condition $k = 10^{5/2} k_{\rm UV}(t_{\rm ini})$, with $k_{\rm UV}(t)$ obtained from the GEF solution.
At times $t < t_{\rm ini}$ the mode is assumed to be in Bunch–Davies.
The mode equations are solved with an explicit Runge–Kutta of order 5(4), which is implemented in scipy.integrate.solve_ivp.
For creating a custom subclass of BaseModeSolver with user-specified mode equation and initial conditions, you can use the class factory ModeSolver.
514 def __init__(self, sys : BGSystem): 515 """ 516 Import the evolution of the background dynamics to configure the solver. 517 518 All values in the BGSystem are treated in numerical units. 519 520 Parameters 521 ---------- 522 sys : BGSystem 523 describes the background evolution 524 525 Raises 526 ------ 527 KeyError: 528 if `sys` is missing a `Val` or `Func` object from `necessary_keys` 529 ValueError: 530 if the keys in `necessary_keys` are not `Val` or `Func` objects. 531 """ 532 og_units = sys.units 533 sys.units = False 534 535 #Check that all necessary keys are there: 536 for key in self.necessary_keys: 537 check = True 538 check = ( (key in sys.variable_names()) 539 or 540 (key in sys.constant_names()) 541 or 542 (key in sys.function_names()) 543 ) 544 if not(check): 545 raise KeyError(f"'sys' needs to own an attribute called '{key}'.") 546 547 #store the relevant background evolution parameters 548 self._t = sys.t.value 549 self._N = sys.N.value 550 kh = getattr(sys, self.cutoff) 551 a = np.exp(self._N) 552 553 self._khf = CubicSpline(self._t, kh) 554 555 self._kwargs_ode = {kwarg:None for kwarg in self._ode_dict.values()} 556 self._kwargs_bd = {kwarg:None for kwarg in self._bd_dict.values()} 557 558 #import the values for the mode equation and interpolate 559 for key in self.necessary_keys: 560 obj = getattr(sys, key) 561 if isinstance(obj, Variable): 562 value = getattr(sys, key).value 563 func = CubicSpline(self._t, value) 564 elif isinstance(obj, Func): 565 arg_vals = [] 566 for arg in obj.args: 567 arg_vals.append(getattr(sys, arg.name)) 568 value = obj(*arg_vals) 569 func = CubicSpline(self._t, value) 570 elif isinstance(obj, Constant): 571 value = getattr(sys, key).value 572 func = CubicSpline(self._t, value*np.ones_like(self._t)) 573 else: 574 raise ValueError(f"'{key}' should refer to either a 'Val' or 'Func' subclass.") 575 576 if key in self._ode_dict.keys(): 577 kwarg = self._ode_dict[key] 578 self._kwargs_ode[kwarg] = func 579 if key in self._bd_dict.values(): 580 kwarg = self._bd_dict[key] 581 self._kwargs_bd[kwarg] = func 582 583 #compute the evolution of conformal time for the phases 584 af = CubicSpline(self._t, a) 585 def deta(t, y): return 1/af(t) 586 587 soleta = solve_ivp(deta, [min(self._t), max(self._t)], np.array([0]), t_eval=self._t) 588 589 self.__eta = soleta.y[0,:] 590 591 #find lowest t value corresponding to kh(t) = 10^4 kh(0) 592 self.__tmin = self._t[np.searchsorted(kh, 10**4*kh[0], "right")] 593 594 sys.units = og_units 595 596 return
Import the evolution of the background dynamics to configure the solver.
All values in the BGSystem are treated in numerical units.
Parameters
- sys (BGSystem): describes the background evolution
Raises
- KeyError:: if
sysis missing aValorFuncobject fromnecessary_keys - ValueError:: if the keys in
necessary_keysare notValorFuncobjects.
The class expects these attributes in the .bgtypes.BGSystem passed on initialisation.
30def mode_equation_classic(t: float, y : np.ndarray, k : float, a : Callable, 31 xi : Callable, H : Callable) -> np.ndarray: 32 r""" 33 Mode equation for pure axion inflation: 34 35 $$\ddot{A}_\lambda(t,k) + H \dot{A}_\lambda(t,k) + \left[\left(\frac{k}{a}\right)^2 - 2\lambda \left(\frac{k}{a}\right) \xi H \right]A_\lambda(t,k) = 0 \, .$$ 36 37 Parameters 38 ---------- 39 t : float 40 cosmic time $t$ 41 y : NDArray 42 $\sqrt{2k} A_\lambda(t_{\rm init},k)$ and $a\sqrt{2/k} \dot{A}_\lambda(t_{\rm init},k)$ 43 k : float 44 comoving momentum $k$ 45 a : Callable 46 scale factor as function of time, $a(t)$ 47 xi : Callable 48 instability parameter as function of time, $\xi(t)$ 49 H : Callable 50 Hubble rate as function of time, $H(t)$ 51 52 Returns 53 ------- 54 dydt : array 55 an array of time derivatives of y 56 """ 57 58 dydt = np.zeros_like(y) 59 60 dis1 = k / a(t) 61 dis2 = 2*H(t)*xi(t) 62 63 #positive helicity 64 lam = 1. 65 #Real Part 66 dydt[0] = y[1]*dis1 67 dydt[1] = -(dis1 - lam * dis2) * y[0] 68 69 #Imaginary Part 70 dydt[2] = y[3]*dis1 71 dydt[3] = -(dis1 - lam * dis2) * y[2] 72 73 #negative helicity 74 lam = -1. 75 #Real Part 76 dydt[4] = y[5]*dis1 77 dydt[5] = -(dis1 - lam * dis2) * y[4] 78 79 #Imaginary Part 80 dydt[6] = y[7]*dis1 81 dydt[7] = -(dis1 - lam * dis2) * y[6] 82 83 return dydt
Mode equation for pure axion inflation:
$$\ddot{A}_\lambda(t,k) + H \dot{A}_\lambda(t,k) + \left[\left(\frac{k}{a}\right)^2 - 2\lambda \left(\frac{k}{a}\right) \xi H \right]A_\lambda(t,k) = 0 \, .$$
Parameters
- t (float): cosmic time $t$
- y (NDArray): $\sqrt{2k} A_\lambda(t_{\rm init},k)$ and $a\sqrt{2/k} \dot{A}_\lambda(t_{\rm init},k)$
- k (float): comoving momentum $k$
- a (Callable): scale factor as function of time, $a(t)$
- xi (Callable): instability parameter as function of time, $\xi(t)$
- H (Callable): Hubble rate as function of time, $H(t)$
Returns
- dydt (array): an array of time derivatives of y
9def bd_classic(t: float, k : float) -> np.ndarray: 10 r""" 11 Returns gauge-field modes in Bunch–Davies vacuum: 12 13 $$A_\lambda(k,t) \sim \frac{1}{\sqrt{2k}}exp{(-i \eta(t) k)}\, , \qquad -\eta(t) k \gg 1 \, .$$ 14 15 Parameters 16 ---------- 17 t : float 18 time of initialisation $t_{\rm init}$ 19 k : float 20 comoving momentum $k$ 21 22 Returns 23 ------- 24 y : NDArray 25 $\sqrt{2k} A_\lambda(t_{\rm init},k)$ and $a\sqrt{2/k} \dot{A}_\lambda(t_{\rm init},k)$ in Bunch–Davies 26 27 """ 28 return np.array([1., 0, 0, -1., 1, 0, 0, -1.])
Returns gauge-field modes in Bunch–Davies vacuum:
$$A_\lambda(k,t) \sim \frac{1}{\sqrt{2k}}exp{(-i \eta(t) k)}\, , \qquad -\eta(t) k \gg 1 \, .$$
Parameters
- t (float): time of initialisation $t_{\rm init}$
- k (float): comoving momentum $k$
Returns
- y (NDArray): $\sqrt{2k} A_\lambda(t_{\rm init},k)$ and $a\sqrt{2/k} \dot{A}_\lambda(t_{\rm init},k)$ in Bunch–Davies
598 def compute_spectrum(self, nvals : int, t_interval : tuple|NoneType=None, **SolverKwargs) -> GaugeSpec: 599 r""" 600 Evolve a gauge-field spectrum from Bunch-Davies initial conditions. 601 602 Evolve the mode functions $A_\lambda(t,k)$ and its derivative in time for $n$ modes between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. 603 The $n$ evolved modes are more densly spaced when $\log k_{\rm UV}(t)$ increases more slowly to ensure a higher density of modes 604 crossing the horizon when backreaction effects are relevant. 605 606 Parameters 607 ---------- 608 nvals : int 609 The number of modes $n$ 610 t_interval : tuple or None 611 set $t_{\rm min}$ and $t_{\rm max}$. If None, $t_{\rm min}$ is given by $10^4 k_{\rm UV}(t_{min}) = k_{\rm UV}(0)$ and $t_{\rm max} = \max t$. 612 **SolverKwargs : kwargs 613 tolerance parameters`atol` and `rtol` passed to `solve_ivp` (default: `atol=self.atol`, `rtol=1e-5`) 614 615 Returns 616 ------- 617 spec : GaugeSpec 618 the gauge-field spectrum 619 """ 620 621 if t_interval is None: 622 t_interval = (self.__tmin, max(self._t)) 623 ks, tstart = self._find_tinit_bd(self._create_k_array(nvals, t_interval), mode="k") 624 625 modes = np.array([self._evolve_from_bd(k, tstart[i], **SolverKwargs) 626 for i, k in enumerate(ks)]) 627 628 spec = GaugeSpec({"t":self._t, "N":self._N, "k":ks, 629 "Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 630 631 return spec
Evolve a gauge-field spectrum from Bunch-Davies initial conditions.
Evolve the mode functions $A_\lambda(t,k)$ and its derivative in time for $n$ modes between $k_{\rm UV}(t_{\rm min})$ and $k_{\rm UV}(t_{\rm max})$. The $n$ evolved modes are more densly spaced when $\log k_{\rm UV}(t)$ increases more slowly to ensure a higher density of modes crossing the horizon when backreaction effects are relevant.
Parameters
- nvals (int): The number of modes $n$
- t_interval (tuple or None): set $t_{\rm min}$ and $t_{\rm max}$. If None, $t_{\rm min}$ is given by $10^4 k_{\rm UV}(t_{min}) = k_{\rm UV}(0)$ and $t_{\rm max} = \max t$.
- **SolverKwargs (kwargs):
tolerance parameters
atolandrtolpassed tosolve_ivp(default:atol=self.atol,rtol=1e-5)
Returns
- spec (GaugeSpec): the gauge-field spectrum
633 def update_spectrum(self, spec : GaugeSpec, tstart : float, **SolverKwargs) -> GaugeSpec: 634 r""" 635 Update an existing gauge-field spectrum starting from $t_{\rm start}$ 636 637 Starting from the modes stored in `GaugeSpec`, the function re-evaluates the evolution starting from $t_{\rm start}$. 638 Additional gauge-field modes are evolved starting from Bunch–Davies to account for new modes crossing the horizon 639 at times beyond the original range covered by the input spectrum. 640 641 Parameters 642 ---------- 643 spec : GaugeSpec 644 the spectrum which is to be updated 645 tstart : float 646 the starting time $t_{\rm start}$ 647 **SolverKwargs : kwargs 648 as in `compute_spectrum` 649 650 Returns 651 ------- 652 spec : GaugeSpec 653 the updated gauge-field spectrum 654 """ 655 656 indstart = np.searchsorted(self._t, tstart, "left") 657 teval = self._t[indstart:] 658 Neval = self._N[indstart:] 659 660 tend = teval[-1] 661 indstart = np.searchsorted(spec["t"], teval[0], "left") 662 startspec = spec.tslice(indstart) 663 tstart = startspec["t"] 664 665 #keep mode-evolution from old spectrum for modes with k < 10*kh(tstart) 666 old = np.where(spec["k"] < 10*self._khf(tstart)) 667 new = np.where(spec["k"] >= 10*self._khf(tstart)) 668 669 #Remove modes which need to be renewed from old spectrum: 670 spec.remove_momenta(new) 671 672 #add new modes, adjusting for longer time-span: 673 #rethink this! It seems like overkill 674 n_newmodes = int(len(new)*max((teval[-1] - teval[0])/(max(spec["t"]) - tstart), 1)) 675 676 #Update evolution of modes in spec: 677 kold, tvac = self._find_tinit_bd(spec["k"][old], mode="k") 678 679 updatespec={"t":teval, "N":Neval, "k":kold} 680 681 modes = [] 682 for i, k in enumerate(kold): 683 if tvac[i] > teval[0]: 684 modes.append( self._evolve_from_bd(k, tvac[i], **SolverKwargs) ) 685 else: 686 yini = np.array( 687 [startspec["Ap"][i].real, startspec["dAp"][i].real, 688 startspec["Ap"][i].imag, startspec["dAp"][i].imag, 689 startspec["Am"][i].real, startspec["dAm"][i].real, 690 startspec["Am"][i].imag, startspec["dAm"][i].imag] 691 ) 692 693 modes.append( self._evolve_mode(tstart, yini, k, teval, **SolverKwargs) ) 694 695 modes = np.array(modes) 696 697 updatespec.update({"Ap":modes[:,0,:], "dAp":modes[:,1,:], "Am":modes[:,2,:], "dAm":modes[:,3,:]}) 698 699 spec.merge_spectra(GaugeSpec(updatespec)) 700 701 if n_newmodes > 0: 702 newspec = self.compute_spectrum(n_newmodes, t_interval=(tstart, tend)) 703 #Add new modes 704 spec.add_momenta(newspec) 705 706 return spec 707 708 """ 709 def EvolveSpectrum(self, spec : GaugeSpec, Nstart, Nstep : float=0.1, atol : float|None=None, rtol : float=1e-5) -> GaugeSpec: 710 Neval = np.arange(Nstart, max(self.__N), Nstep) 711 teval = CubicSpline(self.__N, self.__t)(Neval) 712 713 indstart = np.where(spec["N"]<Nstart)[0][-1] 714 startspec = spec.tslice(indstart) 715 716 klen = len(spec["k"]) 717 718 vecode = np.vectorize(lambda t, y, k: self.mode_equation(t, y, k, **self._ode_kwargs), 719 excluded={0, "t"}, 720 signature="(8,n),(n)->(8,n)", 721 ) 722 def ode(t, y): 723 #(k,8) to reshape correctly 724 #transposing to match signature of vecode 725 y = y.reshape(klen,8).T 726 #transposing result s.t. dydt.shape=(k,8) 727 dydt = vecode(t, y, spec["k"]).T 728 #reshape is correct again 729 return dydt.reshape(8*klen) 730 731 spec.merge_spectra(GaugeSpec(newspec)) 732 733 return spec 734 """
Update an existing gauge-field spectrum starting from $t_{\rm start}$
Starting from the modes stored in GaugeSpec, the function re-evaluates the evolution starting from $t_{\rm start}$.
Additional gauge-field modes are evolved starting from Bunch–Davies to account for new modes crossing the horizon
at times beyond the original range covered by the input spectrum.
Parameters
- spec (GaugeSpec): the spectrum which is to be updated
- tstart (float): the starting time $t_{\rm start}$
- **SolverKwargs (kwargs):
as in
compute_spectrum
Returns
- spec (GaugeSpec): the updated gauge-field spectrum
924def ModeSolver(new_mode_eq : Callable, ode_keys : list[str], new_bd_init : Callable, init_keys : list[str], new_cutoff : str="kh", default_atol : float=1e-3): 925 class CustomModeSolver(BaseModeSolver): 926 """ 927 A subclass of `BaseModeSolver` new mode equation and initial condition adapted to a new GEF model 928 929 It Inherits all methods from `BaseModeSolver` but changes the attributes: 930 - `BaseModeSolver.mode_equation` 931 - `BaseModeSolver.initialise_in_bd` 932 - `BaseModeSolver.necessary_keys` 933 - `BaseModeSolver.cutoff` 934 - `BaseModeSolver.atol` 935 936 This entails that `compute_spectrum` will now evolve modes according to `initialise_in_bd` and `mode_equation`. 937 """ 938 939 #Overwrite class attibutes of ModeByMode with new mode equations, boundary conditions and default tolerances. 940 mode_equation = staticmethod(new_mode_eq) 941 _ode_dict = {attr.name:kwarg for kwarg, attr in ode_keys.items()} 942 943 initialise_in_bd = staticmethod(new_bd_init) 944 _bd_dict = {attr.name:kwarg for kwarg, attr in init_keys.items()} 945 cutoff = new_cutoff 946 947 necessary_keys = list( {"t", "N", new_cutoff}.union(set(_ode_dict.keys())).union(set(_bd_dict.keys())) ) 948 949 atol=default_atol 950 951 CustomModeSolver.__qualname__ = "ModeSolver" 952 CustomModeSolver.__module__ = __name__ 953 return CustomModeSolver
Create a subclass of BaseModeSolver with custom mode equation and initial conditions.
In case your GEF model does not follow the pre-defined gauge-field mode equation BaseModeSolver.mode_equation,
or initial conditions, BaseModeSolver.initialise_in_bd this method defines a new subclass with
these methods replaced by new_mode_eq and new_bd_init.
The method new_mode_eq needs to obey the following rules:
- The call signature is
f(t,y,k,**kwargs) - The arguments
t/kexpect floats representing time / momentum - The argument
yexpects anumpy.ndarrrayof shape (8,) with indices- 0 & 2 / 4 & 6: real & imaginary part of $\sqrt{2k} A_\lambda(t,k)$ for $\lambda = 1 \, / -1$
- 1 & 3 / 5 & 7: real & imaginary part of $\sqrt{2/k}\, a \dot{A}_\lambda(t,k)$ for $\lambda = 1 \, / -1$
- The kwargs are functions of the argument
t. - The return is the time derivative of
y
The method new_bd_init needs to obey the following rules:
- The call signature is
f(t,k,**kwargs) - The arguments
t/kexpect floats representing time / momentum - The kwargs are functions of the argument
t. - The return is a
numpy.ndarrrayof shape (8,) with indices- 0 & 2 / 4 & 6: real & imaginary part of $\sqrt{2k} A_\lambda(t_{\rm init},k)$ for $\lambda = 1 \, / -1$
- 1 & 3 / 5 & 7: real & imaginary part of $\sqrt{2/k}\, a \dot{A}_\lambda(t_{\rm init},k)$ for $\lambda = 1 \, / -1$
The lists ode_keys and init_keys are handled as follows:
ode_keysandinit_keysneed to contain the keys associated to the respective kwargs ofnew_mode_eqandnew_bd_init.- These keys correspond to names of
geff.bgtypes.Variableobjects belonging to ageff.bgtypes.BGSystempassed to the class upon initialisation. The respectiveVariableobjects are interpolated to obtain functions of time. These functions are then passed to to the corresponding keyword arguments ofnew_mode_eqandnew_bd_init. ode_keysandinit_keysare added toBaseModeSolver.necessary_keysof the new subclass.
The BaseModeSolver.cutoff and BaseModeSolver.atol attributes can also be adjusted.
Parameters
- new_mode_eq (Callable): a new mode equation
- ode_keys (list of str):
the non-standard keywords of
new_mode_eq - new_bd_init (Callable): a new mode bd initial condition
- init_keys (list of str):
the non-standard keywords of
new_bd_init - new_cutoff (str):
the new
cutoffattribute of the subclass - default_atol (float): the default absolute tolerance used by the subclass
Returns
- NewModeSolver: the newly defined subclass of
BaseModeSolver
Example
import numpy as np
from geff.bgtypes import BGSystem, define_var, t, N, kh
# Define a new mode equation:
def custom_mode_eq(t, y, k, a, X, Y):
#create a return array of the right shape
dydt = np.ones_like(y)
#compute real-part time derivatives for positive modes
dydt[0] = k / a(t) * y[1] # a is a function of t.
dydt[1] = X(t)/Y(t)*y[0] # X and Y are functions of t.
#compute imag-part time derivatives for positive modes
...
#compute real-part time derivatives for negative modes
...
...
return dydt
# Define a new initial condition for the modes:
def custom_bd_init(t, k, alpha):
y = alpha(t)*np.array([...]) # alpha is a function of t.
return y
# the kwargs of custom_mode_eq are 'a', 'X' and 'Y':
custom_ode_keys = ['a', 'X', 'Y']
# the kwarg of custom_bd_init is 'alpha':
custom_init_keys = ['alpha']
# Define the custom mode solver using the class factory:
CustomModeSolver = ModeSolver(custom_mode_eq, custom_ode_keys,
custom_bd_init, custom_init_keys)
# To initialise CustomModeSolver we need a BGSystem.
# Its Variables need to have the right names however:
# The default: 't', 'N', 'kh' were loaded from geff.bgtypes
# Because of custom_mode_eq we also need 'a', 'X', 'Y'
a = define_var("a", 0, 0)
X = define_var("X", 2, 0)
Y = define_var("Y", 2, 0)
# For custom_bd_init we need 'alpha'
alpha = define_var("alpha", 0, 0)
# When in doubt, consult necessary_keys:
print(CustomModeSolver.necessary_keys)
# We create the BGSystem and initialise all its values:
sys = BGSystem({t, N, kh, a, X, Y, alpha}, 1e-6, 1)
sys.initialise("t")(...)
...
# The values in sys can now be used to initialise CustomModeSolver
MbM = CustomModeSolver(sys)
# Let's compute a spectrum using the new setup:
spec = MbM.compute_spectrum(100)
...