import numpy as np
from astropy.timeseries import LombScargle
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from warnings import warn
from uncertainties import ufloat
import corner
import multiprocessing
from joblib import delayed
import joblib
from tqdm.auto import tqdm
from scipy import stats
__all__ = ['Fourier','MultiHarmonicFitter','MultiFrequencyFitter']
class ProgressParallel(joblib.Parallel):
def __init__(self, total=None, **kwds):
self.total = total
super().__init__(**kwds)
def __call__(self, *args, **kwargs):
with tqdm() as self._pbar:
return joblib.Parallel.__call__(self, *args, **kwargs)
def print_progress(self):
if self.total is None:
self._pbar.total = self.n_dispatched_tasks
else:
self._pbar.total = self.total
self._pbar.n = self.n_completed_tasks
self._pbar.refresh()
def is_outlier(points, thresh=3.5):
if len(points.shape) == 1:
points = points[:,None]
median = np.median(points, axis=0)
diff = np.sum((points - median)**2, axis=-1)
diff = np.sqrt(diff)
med_abs_deviation = np.median(diff)
modified_z_score = 0.6745 * diff / med_abs_deviation
return modified_z_score > thresh
def sort_by_amplitude(pfit,perr):
ncomponents = int((len(pfit)-1)//3)
pfit = np.asarray(pfit)
perr = np.asarray(perr)
# Separate values by type
freqs = pfit[:ncomponents]
amps = pfit[ncomponents:2*ncomponents]
phases = pfit[2*ncomponents:-1]
freqs_err = perr[:ncomponents]
amps_err = perr[ncomponents:2*ncomponents]
phases_err = perr[2*ncomponents:-1]
# Sort values by amplitude
keyorder = np.argsort(amps)[::-1]
# Sort values
freqs = list(freqs[keyorder])
amps = list(amps[keyorder])
phases = list(phases[keyorder])
freqs_err = list(freqs_err[keyorder])
amps_err = list(amps_err[keyorder])
phases_err = list(phases_err[keyorder])
# Rearrange results by amplitude
pfit = freqs + amps + phases + [pfit[-1]]
perr = freqs_err + amps_err + phases_err + [perr[-1]]
return pfit, perr
class BaseFitter():
'''
Attributes
----------
t : array-like
Time values of the light curve.
y : array-like
Flux/mag values of the light curve.
error : array-like, optional
Flux/mag errors values of the light curve. If not given, Fourier parameter
errors will be less reliable. In this case use `error_estimation`.
'''
def __init__(self, t, y, error=None):
t = np.asarray(t,dtype=float)
y = np.asarray(y,dtype=float)
if error is not None:
error = np.asarray(error,dtype=float)
goodpts = np.isfinite(t)
goodpts &= np.isfinite(y)
if error is not None:
goodpts &= np.isfinite(error)
self.t = t[goodpts]
self.y = y[goodpts]
if error is not None:
self.error = error[goodpts]
else:
self.error = error
def _func(self, time, amp, best_freq, phase, kind='sin'):
# sin or cos funtion to be fitted
if kind=='sin':
y = amp*np.sin(2*np.pi*best_freq*time + phase )
elif kind=='cos':
y = amp*np.cos(2*np.pi*best_freq*time + phase )
else:
raise TypeError('%s format does not exist. Select \'sin\' or \'cos\'.' % str(kind))
return y
def _analytic_uncertainties(self,time,residual,amp):
N = len(time)
T = np.ptp(time)
sigma_m = np.std(residual)
sigma_f = np.sqrt(6/N) * 1/(np.pi*T) * sigma_m/amp
sigma_a = np.sqrt(2/N) * sigma_m
sigma_phi = 1/(2*np.pi) * np.sqrt(2/N) * sigma_m/amp
return sigma_f,sigma_a,sigma_phi
[docs]class Fourier(BaseFitter):
'''
Attributes
----------
t : array-like
Time values of the light curve.
y : array-like
Flux/mag values of the light curve.
error : array-like, optional
Flux/mag errors values of the light curve. If not given, Fourier parameter
errors will be less reliable. In this case use `error_estimation`.
'''
[docs] def spectral_window(self,
minimum_frequency=None,
maximum_frequency=None,
nyquist_factor=1,
samples_per_peak=10,
plotting=False):
"""
Calculates the spectral window.
Parameters
----------
minimum_frequency : float, optional
If specified, then use this minimum frequency rather than one chosen based on the size
of the baseline.
maximum_frequency : float, optional
If specified, then use this maximum frequency rather than one chosen based on the average
nyquist frequency.
nyquist_factor : float, default: 1
The multiple of the average nyquist frequency used to choose the maximum frequency
if ``maximum_frequency`` is not provided.
samples_per_peak: float, default: 10
The approximate number of desired samples across the typical frequency peak.
plotting: bool, default: False
If `True`, spectral window will be displayed.
Returns
-------
freq : array-like
Frequency grid.
sw : array-like
Spectral window at given frequencies.
"""
ls = LombScargle(self.t, self.y)
lsf = ls.autofrequency( samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor,
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency)
costerm = np.cos(2*np.pi*lsf[:,np.newaxis]*self.t).sum(axis=1)
sinterm = np.sin(2*np.pi*lsf[:,np.newaxis]*self.t).sum(axis=1)
sw = np.sqrt(costerm**2 + sinterm**2)/len(self.t)
if plotting:
fig = plt.figure(figsize=(15,3))
plt.plot(lsf, sw)
plt.xlabel('Frequency (c/d)')
plt.ylabel('Spectral window')
plt.show()
plt.close(fig)
return lsf,sw
[docs] def spectrum(self,
minimum_frequency=None,
maximum_frequency=None,
nyquist_factor=1,
samples_per_peak=10,
plotting=False):
"""
Calculates the classic Fourier spectrum.
Parameters
----------
minimum_frequency : float, optional
If specified, then use this minimum frequency rather than one chosen based on the size
of the baseline.
maximum_frequency : float, optional
If specified, then use this maximum frequency rather than one chosen based on the average
nyquist frequency.
nyquist_factor : float, default: 1
The multiple of the average nyquist frequency used to choose the maximum frequency
if ``maximum_frequency`` is not provided.
samples_per_peak: float, default: 10
The approximate number of desired samples across the typical frequency peak.
plotting: bool, default: False
If `True`, spectrum will be displayed.
Returns
-------
freq : array-like
Frequency grid.
spec : array-like
The Fourier spectrum at given frequencies.
"""
sampling_time = np.median(np.diff(self.t))
maxfreq = 0.5/sampling_time * nyquist_factor
minfreq = 1/self.t.ptp()
if minimum_frequency is not None:
minfreq = float(minimum_frequency)
if maximum_frequency is not None:
maxfreq = float(maximum_frequency)
Nfreqs = int(maxfreq/(1/self.t.ptp())*samples_per_peak)
nu_grid = np.linspace(minfreq,maxfreq,Nfreqs)
nu_grid = nu_grid[:,np.newaxis]
magcorr = self.y - np.mean(self.y)
Ftnu = magcorr * np.exp(-1j * 2*np.pi * self.t * nu_grid)
Ftnu = np.nansum(Ftnu,axis=1) / len(self.y)
FFT = 2*np.abs(Ftnu)
if plotting:
fig = plt.figure(figsize=(15,3))
plt.plot(nu_grid.reshape(-1), FFT)
plt.xlabel('Frequency (c/d)')
plt.ylabel('Amplitude')
plt.show()
plt.close(fig)
return nu_grid.reshape(-1), FFT
[docs]class MultiHarmonicFitter(BaseFitter):
'''
Attributes
----------
t : array-like
Time values of the light curve.
y : array-like
Flux/mag values of the light curve.
error : array-like, optional
Flux/mag errors values of the light curve. If not given, Fourier parameter
errors will be less reliable. In this case use `error_estimation`.
'''
[docs] def lc_model(self, *arg):
"""
Get model light curve with all harmonic components at the same time.
Parameters
----------
time : array
Desired time points where multi-harmonic component fit is desired.
arg : arguments
List of arguments containing the frequency, amplitudes, phases, zero point.
Returns
----------
y : array
Multi-harmonic model light curve.
"""
if not hasattr(self,"kind"):
warn("Please run \'fit_harmonics\' first!")
return None
time = arg[0]
best_freq = arg[1]
nparams = (len(arg)-3)//2
amps = arg[2:2+nparams]
phases = arg[2+nparams:-1]
const = arg[-1]
y = 0
for i in range(len(amps)):
y += self._func(time, amps[i], (i+1)*best_freq, phases[i], kind=self.kind)
y += const
return y
def _estimate_errors(self,seed):
np.random.seed(seed)
if self.error_estimation == 'bootstrap':
tmp_lc = self.lc[np.random.choice( self.lc.shape[0], int(len(self.lc)*self.sample_size), replace=False), :]
elif self.error_estimation == 'montecarlo':
tmp_lc = self.lc.copy()
if self.error is None:
tmp_lc[:,1] += np.random.normal(0,self.yerror,tmp_lc.shape[0])
else:
tmp_lc[:,1] += np.random.normal(0,self.error,tmp_lc.shape[0])
# Subtract mean from time points to decouple frequencies from phases
tmp_lc[:,0] -= tmp_lc[:,0].mean()
lbound = [0]*(1+len(self.amps)) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = [2*self.freqs[0]] + [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
try:
if self.error is None:
tmp_pfit, _ = curve_fit(lambda *args: self.lc_model(*args), tmp_lc[:,0], tmp_lc[:,1],
p0=(self.freqs[0], *self.amps, *self.phases, np.mean(tmp_lc[:,1])),
bounds=bounds, maxfev=5000)
else:
tmp_pfit, _ = curve_fit(lambda *args: self.lc_model(*args), tmp_lc[:,0], tmp_lc[:,1],
p0=(self.freqs[0], *self.amps, *self.phases, np.mean(tmp_lc[:,1])) ,
sigma=tmp_lc[:,2], absolute_sigma=self.absolute_sigma, bounds=bounds, maxfev=5000)
tmp_pfit[1+len(self.amps):-1] = tmp_pfit[1+len(self.amps):-1]%(2*np.pi)
except RuntimeError:
tmp_pfit = [np.nan] * (2 + len(self.amps) +len(self.phases) )
return tmp_pfit
[docs] def fit_harmonics(self,maxharmonics = 3, sigma = 4,
absolute_sigma=True,
plotting = False, scale='flux',
minimum_frequency=None, maximum_frequency=None,
nyquist_factor=1,samples_per_peak=100,
kind='sin',
error_estimation='analytic',ntry=1000,
sample_size=0.7,
parallel=True, ncores=-1,
refit=False,
best_freq=None):
"""
``fit_harmonics`` performs Fourier pre-whitening with harmonic fitting.
Parameters
----------
maxharmonics : int, default: 3
The maximum number of harmonics to be fitted. Pass a very large number
to fit all harmonics, limited by the signal-to-noise ratio.
sigma : float, default: 4
Signal-to-noise ratio above which a frequency is considered significant and kept.
absolute_sigma : bool, default: True
If `True`, error is used in an absolute sense and the estimated parameter covariance
reflects these absolute values.
plotting: bool, default: False
If `True`, fitting steps will be displayed.
scale: 'mag' or 'flux', default: 'flux'
Lightcurve plot scale.
minimum_frequency : float, optional
If specified, then use this minimum frequency rather than one chosen based on the size
of the baseline.
maximum_frequency : float, optional
If specified, then use this maximum frequency rather than one chosen based on the average
nyquist frequency.
nyquist_factor : float, default: 1
The multiple of the average nyquist frequency used to choose the maximum frequency
if ``maximum_frequency`` is not provided.
samples_per_peak: float, default: 100
The approximate number of desired samples across the typical frequency peak.
kind: str, 'sin' or 'cos'
Harmonic _function to be fitted.
error_estimation: `analytic`, `bootstrap` or `montecarlo`, default: `analytic`
If `bootstrap` or `montecarlo` is choosen, boostrap or monte carlo method will be used to estimate parameter uncertainties.
Otherwise given uncertainties are calculated analytically.
ntry: int, default: 1000
Number of resamplings for error estimation.
sample_size: float, default: 0.7
The ratio of data points to be used for bootstrap error estimation in each step.
Applies only if `error_estimation` is set to `bootstrap`.
parallel: bool, default : True
If `True`, sampling for error estimation is performed parallel to speed up the process.
ncores: int, default: -1
Number of CPU cores to be used for parallel error estimation. If `-1`, then all available
cores will be used.
best_freq : float, default: None
If given, then this frequency will be used as the basis of the harmonics,
instead of calculating a Lomb-Scargle spectrum to get a frequency.
Returns
-------
pfit : array-like
Array of fitted parameters. The main frequency, amplitudes and phases of the harmonics,
and the zero point.
perr : array-like
Estimated error of the parameters.
"""
self.sample_size = sample_size
self.kind = kind
self.absolute_sigma = absolute_sigma
self.error_estimation = error_estimation
self.minimum_frequency = minimum_frequency
self.maximum_frequency = maximum_frequency
self.nyquist_factor = nyquist_factor
self.samples_per_peak = samples_per_peak
if minimum_frequency is not None and maximum_frequency is not None:
if minimum_frequency > maximum_frequency:
raise ValueError('Minimum frequency is larger than maximum frequency.')
if maxharmonics<1:
raise ValueError('Number of frequencies must be >=1.')
if maximum_frequency is None and nyquist_factor * (0.5/np.median( np.diff(self.t) )) < 2.:
# Nyquist is low
warn('Nyquist frequency is low!\nYou might want to set maximum frequency instead.')
if error_estimation not in ['analytic','bootstrap','montecarlo']:
raise TypeError('%s method is not supported! Please choose \'analytic\', \'bootstrap\' or \'montecarlo\'.' % str(error_estimation))
# fit periodic funtions and do prewhitening
yres = self.y.copy()
self.freqs = []
self.amps = []
self.phases = []
self.zeropoints = []
self.freqserr = []
self.ampserr = []
self.phaseserr = []
self.zeropointerr = []
for i in range(maxharmonics):
if i == 0:
if best_freq is None:
ls = LombScargle(self.t, yres, nterms=1)
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor)
freq, power = ls.autopower(normalization='psd',
samples_per_peak=2000,
minimum_frequency=max(minimum_frequency if minimum_frequency is not None else 1e-10,
freq[power.argmax()]-5/self.t.ptp()),
maximum_frequency=min(maximum_frequency if maximum_frequency is not None else np.inf,
freq[power.argmax()]+20/self.t.ptp()) )
# LS may return inf values
goodpts = np.isfinite(power)
freq = freq[goodpts]
power = power[goodpts]
# get first spectrum and fit first periodic component
try:
if best_freq is None: best_freq = freq[power.argmax()]
pfit1, pcov1 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, best_freq, phase ,kind=kind) + const,
self.t, yres,
p0=(np.ptp(yres)/4,2.,np.mean(yres)), bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
pfit2, pcov2 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, best_freq, phase ,kind=kind) + const,
self.t, yres,
p0=(np.ptp(yres)/4,5.,np.mean(yres)), bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
chi2_1 = np.sum( (yres-pfit1[-1] - self._func(self.t, pfit1[0], best_freq, pfit1[1] ,kind=kind))**2 )
chi2_2 = np.sum( (yres-pfit2[-1] - self._func(self.t, pfit2[0], best_freq, pfit2[1] ,kind=kind))**2 )
if chi2_1 < chi2_2:
pfit = pfit1
else:
pfit = pfit2
pfit, pcov = curve_fit(lambda time, amp, freq, phase, const: self._func(time, amp, freq, phase, kind=kind) + const,
self.t, yres,
p0=(pfit[0],best_freq,pfit[1],np.mean(yres)),
bounds=([0,0,0,-np.inf], [np.ptp(yres), 2*best_freq, 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
best_freq = pfit[1]
except (RuntimeError,ValueError) as err:
warn(err)
self.pfit = self.perr = [np.nan]*(3+1)
return np.asarray(self.pfit), np.asarray(self.perr)
else:
# --- Check if frequency is greater than maximum_frequency ---
if maximum_frequency is not None and (i+1)*best_freq > maximum_frequency:
# freq > maximum_frequency
warn('Frequency is larger than maximum frequency!\nIncrease maximum frequency to include more peaks!\nSkipping...')
break
elif maximum_frequency is None and (i+1)*best_freq > nyquist_factor * (0.5/np.median( np.diff(self.t) )):
# freq > nyquist_factor * Nyquist
warn('Frequency is larger than nyquist_factor (%d) x Nyquist frequency!\nSet maximum frequency to avoid problems!\nSkipping...' % int(nyquist_factor))
break
# --- Check if max power is still above the noise level ---
ls = LombScargle(self.t, yres, nterms=1)
minf = (i+1)*best_freq - max(0.5,10*1/np.ptp(self.t))
if minf<0 : minf = 0
with np.errstate(divide='ignore',invalid='ignore'):
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minf,
maximum_frequency=(i+1)*best_freq + max(0.5,10*1/np.ptp(self.t)),
samples_per_peak=samples_per_peak)
# Convert LS power to amplitude
power = np.sqrt(4*power/len(self.t))
# LS may return inf values
goodpts = np.isfinite(power)
freq = freq[goodpts]
power = power[goodpts]
power[power<0] = 0
df = 1/np.ptp(self.t)
umpeak = (freq > (i+1)*best_freq - df) & (freq < (i+1)*best_freq + df)
if np.nanmax(power[umpeak]) / np.nanmean(power) < sigma:
# s/n < sigma
break
# --- Check if best period is longer than 2x data duration ---
if np.allclose(freq[np.argmax(power)] ,0) or 1./freq[np.argmax(power)] > 2*np.ptp(self.t):
warn('Period is longer than 2x data duration!\nSet minimum frequency to avoid problems!\nSkipping...')
break
# --- get ith spectrum and fit ith periodic component ---
pfit1, pcov1 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, (i+1)*best_freq, phase, kind=kind) + const,
self.t, yres,
p0=(np.ptp(yres)/4,2.,np.mean(yres)),
bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
pfit2, pcov2 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, (i+1)*best_freq, phase, kind=kind) + const,
self.t, yres,
p0=(np.ptp(yres)/4,5.,np.mean(yres)),
bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
chi2_1 = np.sum( (yres-pfit1[-1] - self._func(self.t, pfit1[0], (i+1)*best_freq, pfit1[1], kind=kind ))**2 )
chi2_2 = np.sum( (yres-pfit2[-1] - self._func(self.t, pfit2[0], (i+1)*best_freq, pfit2[1], kind=kind ))**2 )
if chi2_1 < chi2_2:
pfit = pfit1
pcov = pcov1
else:
pfit = pfit2
pcov = pcov2
if plotting:
# plot phased light curve and fit
if i==0:
per = 1/pfit[1]
else:
per = 1/( (i+1)*best_freq )
# Calculate spectrum for plotting
ls = LombScargle(self.t, yres, nterms=1)
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor)
plt.figure(figsize=(15,3))
plt.subplot(121)
plt.plot(freq, 2*np.sqrt(power/len(self.t)) )
plt.xlabel('Frequency (c/d)')
plt.ylabel('Amplitude')
plt.grid()
plt.subplot(122)
plt.plot(self.t%per/per,yres-pfit[-1],'k.')
plt.plot(self.t%per/per+1,yres-pfit[-1],'k.')
if i == 0:
plt.plot(self.t%per/per,self._func(self.t,pfit[0],pfit[1],pfit[2], kind=kind),'C1.',ms=1)
plt.plot(self.t%per/per+1,self._func(self.t,pfit[0],pfit[1],pfit[2], kind=kind),'C1.',ms=1)
else:
plt.plot(self.t%per/per,self._func(self.t,pfit[0],(i+1)*best_freq,pfit[1], kind=kind),'C1.',ms=1)
plt.plot(self.t%per/per+1,self._func(self.t,pfit[0],(i+1)*best_freq,pfit[1], kind=kind),'C1.',ms=1)
if scale == 'mag': plt.gca().invert_yaxis()
plt.xlabel('Phase (f=%.6f c/d; P=%.5f d)' % (1/per,per))
plt.ylabel('Brightness')
plt.show()
if i==0:
# start to collect results
self.freqs.append( pfit[1] )
self.amps.append( pfit[0] )
self.phases.append( pfit[2] )
self.zeropoints.append( pfit[3] )
pcov = np.sqrt(np.diag(pcov))
self.freqserr.append( pcov[1] )
self.ampserr.append( pcov[0] )
self.phaseserr.append( pcov[2] )
self.zeropointerr.append( pcov[3] )
yres -= self._func(self.t,pfit[0],pfit[1],pfit[2], kind=kind) + pfit[3]
else:
# collect ith results
self.freqs.append( (i+1)*best_freq )
self.amps.append( pfit[0] )
self.phases.append( pfit[1] )
self.zeropoints.append( pfit[2] )
pcov = np.sqrt(np.diag(pcov))
self.freqserr.append( self.freqserr[0] )
self.ampserr.append( pcov[0] )
self.phaseserr.append( pcov[1] )
self.zeropointerr.append( pcov[2] )
yres -= self._func(self.t,pfit[0],(i+1)*best_freq,pfit[1], kind=kind) + pfit[2]
try:
# fit all periodic components at the same time
lbound = [0]*(1+len(self.amps)) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = [2*best_freq] + [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
pfit, pcov = curve_fit(lambda *args: self.lc_model(*args), self.t, self.y,
p0=(self.freqs[0], *self.amps, *self.phases, np.mean(self.y)),
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
# convert all phases into the 0-2pi range
pfit[1+len(self.amps):-1] = pfit[1+len(self.amps):-1]%(2*np.pi)
'''
# if any of the errors is inf, shift light curve by period/2 and get new errors
if not error_estimation and np.any(np.isinf(pcov)):
warn('One of the errors is inf! Shifting light curve by half period to calculate errors...')
lbound = [0]*(1+len(self.amps)) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = [2*best_freq] + [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
_, pcov = curve_fit(lambda *args: self.lc_model(*args), self.t + 0.5/pfit[0], self.y, p0=(self.freqs[0], *self.amps, *[(pha+np.pi)%(2*np.pi) for pha in self.phases], np.mean(self.y)) ,
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
'''
if error_estimation == 'analytic':
self.pfit = pfit
self.perr = self.get_analytic_uncertainties() + [ np.sqrt(np.diag(pcov))[-1] ]
return np.asarray(self.pfit), np.asarray(self.perr)
elif error_estimation != 'analytic' and refit is False:
# use bootstrap or MC to get realistic errors (bootstrap = get subsample and redo fit n times)
if self.error is None: self.lc = np.c_[self.t, self.y]
else: self.lc = np.c_[self.t, self.y, self.error]
self.pfit = pfit
if self.error is None:
#self.yerror = 0.5*np.std(self.get_residual()[1])
self.yerror = stats.median_abs_deviation(self.get_residual()[1])
if error_estimation == 'bootstrap': print('Bootstrapping...',flush=True)
elif error_estimation == 'montecarlo': print('Performing monte carlo...',flush=True)
error_estimation_fit = np.empty( (ntry,len(pfit)) )
seeds = np.random.randint(1e09,size=ntry)
if parallel:
# do error estimation fit parallal
available_ncores = multiprocessing.cpu_count()
if ncores <= -1:
ncores = available_ncores
elif available_ncores<ncores:
ncores = available_ncores
error_estimation_fit = ProgressParallel(n_jobs=ncores,total=ntry)(delayed(self._estimate_errors)(par) for par in seeds)
error_estimation_fit = np.asarray(error_estimation_fit)
else:
for i,seed in tqdm(enumerate(seeds),total=ntry):
error_estimation_fit[i,:] = self._estimate_errors(seed)
# Get rid of nan values
goodpts = np.all(np.isfinite(error_estimation_fit),axis=1)
error_estimation_fit = error_estimation_fit[ goodpts ]
if len(error_estimation_fit) < 4:
print("Increase \'ntry\' or set a higher \'sample_size\'!")
raise RuntimeError('Not enough points to estimate error!')
#meanval = np.nanmean(error_estimation_fit,axis=0)
#perr = np.nanstd(error_estimation_fit,axis=0)
bspercentiles = np.percentile(error_estimation_fit,[16, 50, 84],axis=0)
perr = np.min(np.c_[bspercentiles[2]-bspercentiles[1],bspercentiles[1]-bspercentiles[0]],axis=1)
if plotting:
labels = [r'Freq'] + [r'A$_'+str(i+1)+'$' for i in range(len(self.amps))] + \
[r'$\Phi_'+str(i+1)+'$' for i in range(len(self.phases))] + [r'Zero point']
if np.sum( is_outlier(error_estimation_fit) ) < 0.05*error_estimation_fit.shape[0]:
# Drop outliers if their number is low to not to change the distributions
_ = corner.corner(error_estimation_fit[~is_outlier(error_estimation_fit)],
labels=labels,
quantiles=[0.16, 0.5, 0.84],
show_titles=True, title_kwargs={"fontsize": 12})
else:
_ = corner.corner(error_estimation_fit,
labels=labels,
quantiles=[0.16, 0.5, 0.84],
show_titles=True, title_kwargs={"fontsize": 12})
if np.any(perr == 0.0):
# if any of the errors is 0, shift light curve by period/2 and get new errors
warn('One of the errors is too small! Shifting light curve by half period to calculate errors...')
torig = self.t.copy()
yorig = self.y.copy()
# shifting light curve by period/2
self.t = self.t + 0.5/pfit[0]
_,newperr = self.fit_harmonics(maxharmonics = maxharmonics,
plotting = False,
kind=kind,
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
nyquist_factor=nyquist_factor,
samples_per_peak=samples_per_peak,
error_estimation=error_estimation,
ntry=ntry,
parallel=parallel, ncores=ncores,
sample_size=self.sample_size,
refit=True)
try:
um = np.where( perr == 0.0 )[0]
perr[um] = np.copy(newperr[um])
except IndexError:
pass
self.t = torig
self.y = yorig
self.pfit = pfit
self.perr = perr
return np.asarray(self.pfit), np.asarray(self.perr)
self.pfit = pfit
self.perr = perr
return np.asarray(self.pfit), np.asarray(self.perr)
elif error_estimation != 'analytic' and refit:
# get new errors for shifted light curve
if self.error is None: self.lc = np.c_[self.t, self.y]
else: self.lc = np.c_[self.t, self.y, self.error]
self.pfit = pfit
if self.error is None:
#self.yerror = 0.5*np.std(self.get_residual()[1])
self.yerror = stats.median_abs_deviation(self.get_residual()[1])
error_estimation_fit = np.empty( (ntry,len(pfit)) )
seeds = np.random.randint(1e09,size=ntry)
if parallel:
# do error_estimation fit parallal
available_ncores = multiprocessing.cpu_count()
if ncores <= -1:
ncores = available_ncores
elif available_ncores<ncores:
ncores = available_ncores
error_estimation_fit = ProgressParallel(n_jobs=ncores,total=ntry)(delayed(self._estimate_errors)(par) for par in seeds)
error_estimation_fit = np.asarray(error_estimation_fit)
else:
for i,seed in tqdm(enumerate(seeds),total=ntry):
error_estimation_fit[i,:] = self._estimate_errors(seed)
# Get rid of nan values
goodpts = np.all(np.isfinite(error_estimation_fit),axis=1)
error_estimation_fit = error_estimation_fit[ goodpts ]
if len(error_estimation_fit) < 4:
print("Increase \'ntry\' or set a higher \'sample_size\'!")
raise RuntimeError('Not enough points to estimate error!')
#meanval = np.nanmean(error_estimation_fit,axis=0)
#newperr = np.nanstd(error_estimation_fit,axis=0)
bspercentiles = np.percentile(error_estimation_fit,[16, 50, 84],axis=0)
newperr = np.min(np.c_[bspercentiles[2]-bspercentiles[1],bspercentiles[1]-bspercentiles[0]],axis=1)
self.pfit = pfit
self.perr = newperr
return np.asarray(self.pfit), np.asarray(self.perr)
except RuntimeError:
# if fit all components at once fails return previous results, and errors from covariance matrix
if error_estimation:
warn('%s method failed! Returning errors from pre-whitening steps.' % str(error_estimation))
warn('Something went wrong! Returning errors from pre-whitening steps.')
self.pfit = np.array([self.freqs[0], *self.amps, *self.phases, self.zeropoints[0] ])
self.perr = np.array([self.freqserr[0], *self.ampserr, *self.phaseserr, self.zeropointerr[0] ])
return self.pfit, self.perr
[docs] def get_fourier_parameters(self):
"""
Calculates Fourier parameters from given amplitudes and phases.
Returns
-------
freq : number with uncertainty
Main frequency and its estimated error.
period : number with uncertainty
Main period and its estimated error.
Rn1 : number with uncertainty
Rn1 value(s) and its estimated error(s),
where n is the harmonics order.
Pn1 : number with uncertainty
Phin1 value(s) and its estimated error(s),
where n is the harmonics order.
"""
if not hasattr(self,"pfit"):
warn("Please run \'fit_harmonics\' first!")
return None,None,None,None
if np.all(np.isnan(self.pfit)) or len(self.pfit)<6:
warn('Not enough components to get Fourier parameters!')
#raise ValueError('Not enough components to get Fourier parameters!')
freq = ufloat(np.nan,np.nan)
period = ufloat(np.nan,np.nan)
Rn1 = [ufloat(np.nan,np.nan)]
Pn1 = [ufloat(np.nan,np.nan)]
else:
nfreq = int( (len(self.pfit)-1)/2 )
freq = ufloat(self.pfit[0],self.perr[0])
period = 1/freq
Rn1 = []
Pn1 = []
for i in range(2,nfreq+1):
Rn1.append( ufloat(self.pfit[i],self.perr[i]) / ufloat(self.pfit[1],self.perr[1]) )
Pn1.append( ( ufloat(self.pfit[nfreq+i],self.perr[nfreq+i] )- i* ufloat(self.pfit[nfreq+1],self.perr[nfreq+1]) )%(2*np.pi) )
return freq,period,Rn1,Pn1
[docs] def get_residual(self):
"""
Calculates the residual light curve after multi-harmonic Fourier fitting.
Returns
-------
t : array
Time stamps.
y : array
Residual flux/amp.
yerr : array, optional
If input errors were given, then error of residual flux/amp.
"""
if not hasattr(self,"pfit"):
warn("Please run \'fit_harmonics\' first!")
if self.error is None:
return None,None
else:
return None,None,None
if self.error is None:
return self.t, self.y - self.lc_model(self.t, *self.pfit), np.ones_like(self.t)*np.nan
else:
return self.t, self.y - self.lc_model(self.t, *self.pfit), self.error
[docs] def get_analytic_uncertainties(self):
"""
Calculates analytically derived uncertainties for multi-harmonic fit.
Method is based on Breger et al. 1999.
Returns
-------
perr : array-like
Estimated error of the frequency and amplitudes, phases.
"""
if not hasattr(self,"pfit"):
warn("Please run \'fit_harmonics\' first!")
return None
resy = self.get_residual()[1]
sigf = self._analytic_uncertainties(self.t,resy,self.pfit[1])[0]
sigA = []
sigPhi = []
ncomponents = int((len(self.pfit)-1)/2)
for i in range(1, ncomponents + 1 ):
sigmas = self._analytic_uncertainties(self.t,resy,self.pfit[i])
sigA.append( sigmas[1] )
sigPhi.append( sigmas[2] )
return [sigf] + sigA + sigPhi
[docs]class MultiFrequencyFitter(BaseFitter):
[docs] def lc_model(self, *arg):
"""
Get model light curve with all frequency components at the same time.
Parameters
----------
time : array
Desired time points where multi-frequency fit is desired.
arg : arguments
List of arguments containing the frequencies, amplitudes, phases
of each periodic component and the zero point.
Returns
----------
y : array
Multi-frequency model light curve.
"""
if not hasattr(self,"kind"):
warn("Please run \'fit_freqs\' first!")
return None
time = arg[0]
nparams = (len(arg)-2)//3
freqs = arg[1:nparams+1]
amps = arg[1+nparams:1+2*nparams]
phases = arg[1+2*nparams:-1]
const = arg[-1]
y = 0
for i in range(len(amps)):
y += self._func(time, amps[i], freqs[i], phases[i], kind=self.kind)
y += const
return y
def _estimate_errors(self,seed):
np.random.seed(seed)
if self.error_estimation == 'bootstrap':
tmp_lc = self.lc[np.random.choice( self.lc.shape[0], int(len(self.lc)*self.sample_size), replace=False), :]
elif self.error_estimation == 'montecarlo':
tmp_lc = self.lc.copy()
if self.error is None:
tmp_lc[:,1] += np.random.normal(0,self.yerror,tmp_lc.shape[0])
else:
tmp_lc[:,1] += np.random.normal(0,self.error,tmp_lc.shape[0])
# Subtract mean from time points to decouple frequencies from phases
tmp_lc[:,0] -= tmp_lc[:,0].mean()
lbound = list( np.array(self.freqs) - 0.1 )
lbound += [0]*len(self.amps) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = list( np.array(self.freqs) + 0.1 )
ubound += [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
try:
if self.error is None:
tmp_pfit, _ = curve_fit(lambda *args: self.lc_model(*args), tmp_lc[:,0], tmp_lc[:,1],
p0=(*self.freqs, *self.amps, *self.phases, np.mean(tmp_lc[:,1])),
bounds=bounds, maxfev=5000)
else:
tmp_pfit, _ = curve_fit(lambda *args: self.lc_model(*args), tmp_lc[:,0], tmp_lc[:,1],
p0=(*self.freqs, *self.amps, *self.phases, np.mean(tmp_lc[:,1])) ,
sigma=tmp_lc[:,2], absolute_sigma=self.absolute_sigma, bounds=bounds, maxfev=5000)
tmp_pfit[2*len(self.amps):-1] = tmp_pfit[2*len(self.amps):-1]%(2*np.pi)
except RuntimeError:
tmp_pfit = [np.nan] * ( len(self.freqs) + len(self.amps) + len(self.phases) + 1 )
return tmp_pfit
[docs] def fit_freqs(self, maxfreqs = 3, sigma = 4, boxwidth = 1,
absolute_sigma=True,
plotting = False, scale='flux',
minimum_frequency=None, maximum_frequency=None,
nyquist_factor=1,samples_per_peak=100,
kind='sin',
error_estimation='analytic',ntry=1000,
sample_size=0.7,
parallel=True, ncores=-1,
refit=False):
"""
``fit_freqs`` performs consecutive Fourier pre-whitening with given number of frequencies.
Parameters
----------
maxfreqs : int, default: 3
The number of frequencies to be fitted. Pass a very large number to fit all frequencies,
limited by the signal-to-noise ratio (`sigma`).
sigma : float, default: 4
Signal-to-noise ratio above which a frequency is considered significant and kept.
boxwidth : float, default: 1
The frequency range to be used to calculate noise in the residual spectrum.
absolute_sigma : bool, default: True
If `True`, error is used in an absolute sense and the estimated parameter covariance
reflects these absolute values.
plotting: bool, default: False
If `True`, fitting steps will be displayed.
scale: 'mag' or 'flux', default: 'flux'
Lightcurve plot scale.
minimum_frequency : float, optional
If specified, then use this minimum frequency rather than one chosen based on the size
of the baseline.
maximum_frequency : float, optional
If specified, then use this maximum frequency rather than one chosen based on the average
nyquist frequency.
nyquist_factor : float, default: 1
The multiple of the average nyquist frequency used to choose the maximum frequency
if ``maximum_frequency`` is not provided.
samples_per_peak: float, default: 100
The approximate number of desired samples across the typical frequency peak.
kind: str, 'sin' or 'cos'
Harmonic _function to be fitted.
error_estimation: `analytic`, `bootstrap` or `montecarlo`, default: `analytic`
If `bootstrap` or `montecarlo` is choosen, boostrap or monte carlo method will be used to estimate parameter uncertainties.
Otherwise given uncertainties are calculated analytically.
ntry: int, default: 1000
Number of resamplings for error estimation.
sample_size: float, default: 0.7
The ratio of data points to be used for bootstrap error estimation in each step.
Applies only if `error_estimation` is set to `bootstrap`.
parallel: bool, default : True
If `True`, sampling for error estimation is performed parallel to speed up the process.
ncores: int, default: -1
Number of CPU cores to be used for parallel error estimation. If `-1`, then all available
cores will be used.
Returns
-------
pfit : array-like
Array of fitted parameters.
The frequencies, amplitudes and phases, and the zero point.
perr : array-like
Estimated error of the parameters.
"""
self.sample_size = sample_size
self.kind = kind
self.absolute_sigma = absolute_sigma
self.error_estimation = error_estimation
if minimum_frequency is not None and maximum_frequency is not None:
if minimum_frequency > maximum_frequency:
raise ValueError('Minimum frequency is larger than maximum frequency.')
if maxfreqs<1:
raise ValueError('Number of frequencies must be >=1.')
if maximum_frequency is None and nyquist_factor * (0.5/np.median( np.diff(self.t) )) < 2.:
# Nyquist is low
warn('Nyquist frequency is low!\nYou might want to set maximum frequency instead.')
if error_estimation not in ['analytic','bootstrap','montecarlo']:
raise TypeError('%s method is not supported! Please choose \'analytic\', \'bootstrap\' or \'montecarlo\'.' % str(error_estimation))
# fit periodic funtions and do prewhitening
yres = self.y.copy()
self.freqs = []
self.amps = []
self.phases = []
self.zeropoints = []
self.freqserr = []
self.ampserr = []
self.phaseserr = []
self.zeropointerr = []
for i in range(maxfreqs):
ls = LombScargle(self.t, yres, nterms=1)
with np.errstate(divide='ignore',invalid='ignore'):
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor)
# Convert LS power to amplitude
power = np.sqrt(4*power/len(self.t))
# LS may return inf values
goodpts = np.isfinite(power)
freq = freq[goodpts]
power = power[goodpts]
power[power<0] = 0
# --- Check if best period is longer than 2x data duration ---
if np.allclose(freq[np.argmax(power)] ,0) or 1./freq[np.argmax(power)] > 2*np.ptp(self.t):
warn('Period is longer than 2x data duration!\nSet minimum frequency to avoid problems!\nSkipping...')
break
# Resample spectrum around highest peak
with np.errstate(divide='ignore',invalid='ignore'):
freq, power = ls.autopower(normalization='psd',
samples_per_peak=2000,
minimum_frequency=max(minimum_frequency if minimum_frequency is not None else 1e-10,
freq[power.argmax()]-5/self.t.ptp()),
maximum_frequency=min(maximum_frequency if maximum_frequency is not None else np.inf,
freq[power.argmax()]+20/self.t.ptp()) )
# Convert LS power to amplitude
power = np.sqrt(4*power/len(self.t))
# LS may return inf values
goodpts = np.isfinite(power)
freq = freq[goodpts]
power = power[goodpts]
power[power<0] = 0
# fit periodic component
try:
best_freq = freq[power.argmax()]
pfit1, pcov1 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, best_freq, phase ,kind=kind) + const, self.t, yres,
p0=(np.ptp(yres)/4,2.,np.mean(yres)), bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
pfit2, pcov2 = curve_fit(lambda time, amp, phase, const: self._func(time, amp, best_freq, phase ,kind=kind) + const, self.t, yres,
p0=(np.ptp(yres)/4,5.,np.mean(yres)), bounds=([0,0,-np.inf], [np.ptp(yres), 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
chi2_1 = np.sum( (yres-pfit1[-1] - self._func(self.t, pfit1[0], best_freq, pfit1[1] ,kind=kind))**2 )
chi2_2 = np.sum( (yres-pfit2[-1] - self._func(self.t, pfit2[0], best_freq, pfit2[1] ,kind=kind))**2 )
if chi2_1 < chi2_2:
pfit = pfit1
else:
pfit = pfit2
pfit, pcov = curve_fit(lambda time, amp, freq, phase, const: self._func(time, amp, freq, phase, kind=kind) + const,
self.t, yres,
p0=(pfit[0],best_freq,pfit[1],np.mean(yres)),
bounds=([0,0,0,-np.inf], [np.ptp(yres), 2*best_freq, 2*np.pi, np.inf]) ,
sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
best_freq = pfit[1]
if np.allclose(best_freq ,0):
raise ValueError("Found period is infinite. Skipping...")
except (RuntimeError,ValueError) as err:
if i == 0:
warn(str(err))
self.pfit = self.perr = [np.nan]*(3+1)
return self.pfit, self.perr
else:
break
# Check peak S/N
self.pfit = self.freqs + [pfit[1]]
self.pfit += self.amps + [pfit[0]]
self.pfit += self.phases + [pfit[2]]
self.pfit += [( self.zeropoints + [pfit[3]] )[0]]
yres_noise = self.get_residual()[1]
ls = LombScargle(self.t, yres_noise, nterms=1)
with np.errstate(divide='ignore',invalid='ignore'):
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor)
# Convert LS power to amplitude
power = np.sqrt(4*power/len(self.t))
# LS may return inf values
goodpts = np.isfinite(power)
freq = freq[goodpts]
power = power[goodpts]
power[power<0] = 0
# s/n < sigma within boxwidth around peak
um = (freq >= pfit[1]-boxwidth/2) & (freq <= pfit[1]+boxwidth/2)
spern = pfit[0] / np.mean(power[um])
if spern < sigma:
warn('Reached the %.1f sigma limit' % sigma)
break
if plotting:
# plot phased light curve and fit
per = 1/pfit[1]
# Calculate spectrum for plotting
ls = LombScargle(self.t, yres, nterms=1)
freq, power = ls.autopower(normalization='psd',
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor)
# Convert LS power to amplitude
power = np.sqrt(4*power/len(self.t))
plt.figure(figsize=(15,3))
plt.subplot(121)
plt.plot(freq, power,label='S/N=%.1f' % spern)
plt.xlabel('Frequency (c/d)')
plt.ylabel('Amplitude')
plt.legend()
plt.grid()
plt.subplot(122)
plt.plot(self.t%per/per,yres-pfit[-1],'k.')
plt.plot(self.t%per/per+1,yres-pfit[-1],'k.')
plt.plot(self.t%per/per,self._func(self.t,pfit[0],pfit[1],pfit[2], kind=kind),'C1.',ms=1)
plt.plot(self.t%per/per+1,self._func(self.t,pfit[0],pfit[1],pfit[2], kind=kind),'C1.',ms=1)
if scale == 'mag': plt.gca().invert_yaxis()
plt.xlabel('Phase (f=%.6f c/d; P=%.5f d)' % (1/per,per))
plt.ylabel('Brightness')
plt.show()
# Collect results
self.freqs.append( pfit[1] )
self.amps.append( pfit[0] )
self.phases.append( pfit[2] )
self.zeropoints.append( pfit[3] )
pcov = np.sqrt(np.diag(pcov))
self.freqserr.append( pcov[1] )
self.ampserr.append( pcov[0] )
self.phaseserr.append( pcov[2] )
self.zeropointerr.append( pcov[3] )
# fit all amplitudes+phases at the same time
try:
# lbound = list( np.array(self.amps) - 0.01*np.array(self.amps) )
lbound = [0]*len(self.amps) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
pfit, pcov = curve_fit(lambda *args: self.lc_model(args[0], *self.freqs, *args[1:]), self.t, self.y,
p0=(*self.amps, *self.phases, self.zeropoints[0]),
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
# convert all phases into the 0-2pi range
pfit[len(self.amps):-1] = pfit[len(self.amps):-1]%(2*np.pi)
self.amps = list(pfit[:len(self.amps)])
self.phases = list(pfit[len(self.amps):-1])
self.zeropoints = [pfit[-1]]*len(self.zeropoints)
except RuntimeError:
pfit = self.amps + self.phases + [self.zeropoints[0]]
self.pfit = pfit
# fit all periodic components at the same time
try:
df = 1/self.t.ptp()
lbound = list( np.array(self.freqs) - df )
lbound += [0]*len(self.amps)
lbound += [-np.inf]*len(self.phases) + [-np.inf]
ubound = list( np.array(self.freqs) + df )
ubound += [np.ptp(self.y)]*len(self.amps) #list(3*np.array(self.amps)) #
ubound += [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
pfit, pcov = curve_fit(lambda *args: self.lc_model(*args), self.t, self.y,
p0=(*self.freqs, *self.amps, *self.phases, self.zeropoints[0]),
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=100)
# convert all phases into the 0-2pi range
pfit[2*len(self.amps):-1] = pfit[2*len(self.amps):-1]%(2*np.pi)
self.freqs = list(pfit[:len(self.amps)])
self.amps = list(pfit[len(self.amps):2*len(self.amps)])
self.phases = list(pfit[2*len(self.amps):-1])
self.zeropoints = [pfit[-1]]*len(self.zeropoints)
self.pfit = pfit
except RuntimeError as err:
print(str(err))
self.pfit = self.freqs + list(pfit)
# Pre-whitening with all frequency components
yres = self.get_residual()[1]
# --- Error estimation after all frequencies are given ---
try:
# fit all periodic components at the same time
zpfit = np.mean(self.y) if len(self.zeropoints)==0 else self.zeropoints[0]
df = 1/self.t.ptp()
lbound = list( np.array(self.freqs) - df )
lbound += [0]*len(self.amps)
lbound += [-np.inf]*len(self.phases) + [-np.inf]
ubound = list( np.array(self.freqs) + df )
ubound += [np.ptp(self.y)]*len(self.amps) #list(3*np.array(self.amps)) #
ubound += [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
pfit, pcov = curve_fit(lambda *args: self.lc_model(*args), self.t, self.y,
p0=(*self.freqs, *self.amps, *self.phases, zpfit),
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=100)
# convert all phases into the 0-2pi range
pfit[2*len(self.amps):-1] = pfit[2*len(self.amps):-1]%(2*np.pi)
'''
# if any of the errors is inf, shift light curve by period/2 and get new errors
if not error_estimation and np.any(np.isinf(pcov)):
warn('One of the errors is inf! Shifting light curve by half period to calculate errors...')
lbound = [0]*(1+len(self.amps)) + [-np.inf]*len(self.phases) + [-np.inf]
ubound = [2*best_freq] + [np.ptp(self.y)]*len(self.amps) + [np.inf]*len(self.phases) + [np.inf]
bounds = (lbound,ubound)
_, pcov = curve_fit(lambda *args: self.lc_model(*args), self.t + 0.5/pfit[0], self.y, p0=(self.freqs[0], *self.amps, *[(pha+np.pi)%(2*np.pi) for pha in self.phases], np.mean(self.y)) ,
bounds=bounds, sigma=self.error, absolute_sigma=absolute_sigma, maxfev=5000)
'''
except RuntimeError as err:
print(str(err))
pfit = self.freqs + self.amps + self.phases + self.zeropoints[:1]
pfit = np.array(pfit)
pcov = np.ones((len(pfit),len(pfit)))
inds = np.diag_indices_from(pcov)
pcov[inds] = self.freqserr + self.ampserr + self.phaseserr + self.zeropointerr[:1]
pcov = pcov**2
# --- Performing error estimation ---
try:
if error_estimation == 'analytic':
self.pfit = pfit
self.perr = self.get_analytic_uncertainties() + [ np.sqrt(np.diag(pcov))[-1] ]
self.pfit, self.perr = sort_by_amplitude(self.pfit, self.perr)
return self.pfit, self.perr
elif error_estimation != 'analytic' and refit is False:
# use bootstrap or MC to get realistic errors (bootstrap = get subsample and redo fit n times)
if self.error is None: self.lc = np.c_[self.t, self.y]
else: self.lc = np.c_[self.t, self.y, self.error]
self.pfit = pfit
if self.error is None:
#self.yerror = 0.5*np.std(self.get_residual()[1])
self.yerror = stats.median_abs_deviation(self.get_residual()[1])
if error_estimation == 'bootstrap': print('Bootstrapping...',flush=True)
elif error_estimation == 'montecarlo': print('Performing monte carlo...',flush=True)
error_estimation_fit = np.empty( (ntry,len(pfit)) )
seeds = np.random.randint(1e09,size=ntry)
if parallel:
# do error estimation fit parallal
available_ncores = multiprocessing.cpu_count()
if ncores <= -1:
ncores = available_ncores
elif available_ncores<ncores:
ncores = available_ncores
error_estimation_fit = ProgressParallel(n_jobs=ncores,total=ntry)(delayed(self._estimate_errors)(par) for par in seeds)
error_estimation_fit = np.asarray(error_estimation_fit)
else:
for i,seed in tqdm(enumerate(seeds),total=ntry):
error_estimation_fit[i,:] = self._estimate_errors(seed)
# Get rid of nan values
goodpts = np.all(np.isfinite(error_estimation_fit),axis=1)
error_estimation_fit = error_estimation_fit[ goodpts ]
if len(error_estimation_fit) < 4:
print("Increase\'ntry\' or set a higher \'sample_size\'!")
raise RuntimeError('Not enough points to estimate error!')
#meanval = np.nanmean(error_estimation_fit,axis=0)
#perr = np.nanstd(error_estimation_fit,axis=0)
bspercentiles = np.percentile(error_estimation_fit,[16, 50, 84],axis=0)
perr = np.min(np.c_[bspercentiles[2]-bspercentiles[1],bspercentiles[1]-bspercentiles[0]],axis=1)
if plotting:
labels = [r'f$_'+str(i+1)+'$' for i in range(len(self.freqs))] + \
[r'A$_'+str(i+1)+'$' for i in range(len(self.amps))] + \
[r'$\Phi_'+str(i+1)+'$' for i in range(len(self.phases))] + [r'Zero point']
if np.sum( is_outlier(error_estimation_fit) ) < 0.05*error_estimation_fit.shape[0]:
# Drop outliers if their number is low to not to change the distributions
_ = corner.corner(error_estimation_fit[~is_outlier(error_estimation_fit)],
labels=labels,
quantiles=[0.16, 0.5, 0.84],
show_titles=True, title_kwargs={"fontsize": 12})
else:
_ = corner.corner(error_estimation_fit,
labels=labels,
quantiles=[0.16, 0.5, 0.84],
show_titles=True, title_kwargs={"fontsize": 12})
if np.any(perr == 0.0):
# if any of the errors is 0, shift light curve by period/2 and get new errors
warn('One of the errors is too small! Shifting light curve by half period to calculate errors...')
torig = self.t.copy()
yorig = self.y.copy()
# shifting light curve by period/2
self.t = self.t + 0.5/pfit[0]
_,newperr = self.fit_freqs(maxfreqs = maxfreqs, sigma=sigma,
plotting = False,
kind=kind,
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
nyquist_factor=nyquist_factor,
samples_per_peak=samples_per_peak,
error_estimation=error_estimation,
ntry=ntry,
parallel=parallel, ncores=ncores,
sample_size=self.sample_size,
refit=True)
try:
um = np.where( perr == 0.0 )[0]
perr[um] = np.copy(newperr[um])
except IndexError:
pass
self.t = torig
self.y = yorig
self.pfit = pfit
self.perr = perr
self.pfit, self.perr = sort_by_amplitude(self.pfit, self.perr)
return self.pfit, self.perr
self.pfit = pfit
self.perr = perr
self.pfit, self.perr = sort_by_amplitude(self.pfit, self.perr)
return self.pfit, self.perr
elif error_estimation != 'analytic' and refit:
# get new errors for shifted light curve
if self.error is None: self.lc = np.c_[self.t, self.y]
else: self.lc = np.c_[self.t, self.y, self.error]
self.pfit = pfit
if self.error is None:
#self.yerror = 0.5*np.std(self.get_residual()[1])
self.yerror = stats.median_abs_deviation(self.get_residual()[1])
error_estimation_fit = np.empty( (ntry,len(pfit)) )
seeds = np.random.randint(1e09,size=ntry)
if parallel:
# do error_estimation fit parallal
available_ncores = multiprocessing.cpu_count()
if ncores <= -1:
ncores = available_ncores
elif available_ncores<ncores:
ncores = available_ncores
error_estimation_fit = ProgressParallel(n_jobs=ncores,total=ntry)(delayed(self._estimate_errors)(par) for par in seeds)
error_estimation_fit = np.asarray(error_estimation_fit)
else:
for i,seed in tqdm(enumerate(seeds),total=ntry):
error_estimation_fit[i,:] = self._estimate_errors(seed)
# Get rid of nan values
goodpts = np.all(np.isfinite(error_estimation_fit),axis=1)
error_estimation_fit = error_estimation_fit[ goodpts ]
if len(error_estimation_fit) < 4:
print("Increase \'ntry\' or set a higher \'sample_size\'!")
raise RuntimeError('Not enough points to estimate error!')
#meanval = np.nanmean(error_estimation_fit,axis=0)
#newperr = np.nanstd(error_estimation_fit,axis=0)
bspercentiles = np.percentile(error_estimation_fit,[16, 50, 84],axis=0)
newperr = np.min(np.c_[bspercentiles[2]-bspercentiles[1],bspercentiles[1]-bspercentiles[0]],axis=1)
self.pfit = pfit
self.perr = newperr
self.pfit, self.perr = sort_by_amplitude(self.pfit, self.perr)
return self.pfit, self.perr
except RuntimeError:
# if fit all components at once fails return previous results, and errors from covariance matrix
if error_estimation:
warn('%s method failed! Returning errors from pre-whitening steps.' % str(error_estimation))
warn('Something went wrong! Returning errors from pre-whitening steps.')
self.pfit = np.array([*self.freqs, *self.amps, *self.phases, self.zeropoints[0] ])
self.perr = np.array([*self.freqserr, *self.ampserr, *self.phaseserr, self.zeropointerr[0] ])
self.pfit, self.perr = sort_by_amplitude(self.pfit, self.perr)
return self.pfit, self.perr
[docs] def get_residual(self):
"""
Calculates the residual light curve after multi-frequency fitting.
Returns
-------
t : array
Time stamps.
y : array
Residual flux/amp.
yerr : array, optional
If input errors were given, then error of residual flux/amp.
"""
if not hasattr(self,"pfit"):
warn("Please run \'fit_freqs\' first!")
if self.error is None:
return None,None
else:
return None,None,None
if self.error is None:
return self.t, self.y - self.lc_model(self.t, *self.pfit), np.ones_like(self.t)*np.nan
else:
return self.t, self.y - self.lc_model(self.t, *self.pfit), self.error
[docs] def get_analytic_uncertainties(self):
"""
Calculates analytically derived uncertainties for multi-frequency fit.
Method is based on Breger et al. 1999.
Returns
-------
perr : array-like
Estimated error of the frequencies, amplitudes, phases.
"""
resy = self.get_residual()[1]
sigf = []
sigA = []
sigPhi = []
ncomponents = int((len(self.pfit)-1)/3)
for i in range(ncomponents):
sigmas = self._analytic_uncertainties(self.t,resy,self.pfit[ncomponents+i])
sigf.append( sigmas[0] )
sigA.append( sigmas[1] )
sigPhi.append( sigmas[2] )
return sigf + sigA + sigPhi