Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lombscargle.py #828

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changes/828.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a compute_rms function to LombScarglePowerspectrum
119 changes: 118 additions & 1 deletion stingray/lombscargle.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import copy
from collections.abc import Iterable
import warnings
from typing import Optional, Union

import numpy as np
import numpy.typing as npt
import scipy
import scipy.stats
from astropy.timeseries.periodograms import LombScargle

from .crossspectrum import Crossspectrum
from .events import EventList
from .exceptions import StingrayError
from .fourier import impose_symmetry_lsft, lsft_fast, lsft_slow
from .fourier import (
impose_symmetry_lsft,
lsft_fast,
lsft_slow,
get_rms_from_unnorm_periodogram,
poisson_level,
unnormalize_periodograms,
)
from .lightcurve import Lightcurve
from .utils import simon

Expand Down Expand Up @@ -352,6 +362,7 @@ def _initialize_empty(self):
self.oversampling = None
self.variance1 = None
self.variance2 = None
self.variance = None
return

def time_lag(self):
Expand Down Expand Up @@ -388,6 +399,112 @@ def from_lc_iterable(self):
"Object has no attribute named 'from_lc_iterable' ! Not applicable for unevenly sampled data"
)

if self.df is None:
self.df = self.freq[1] - self.freq[0]

def compute_rms(self, min_freq, max_freq, poisson_noise_level=None):
"""
Compute the fractional rms amplitude in the power spectrum
between two frequencies.

Parameters
----------
min_freq: float
The lower frequency bound for the calculation.

max_freq: float
The upper frequency bound for the calculation.

Other parameters
----------------
poisson_noise_level : float, default is None
This is the Poisson noise level of the PDS with same
normalization as the PDS. If poissoin_noise_level is None,
the Poisson noise is calculated in the idealcase
e.g. 2./<countrate> for fractional rms normalisation
Dead time and other instrumental effects can alter it.
The user can fit the Poisson noise level outside
this function using the same normalisation of the PDS
and it will get subtracted from powers here.

Returns
-------
rms: float
The fractional rms amplitude contained between ``min_freq`` and
``max_freq``.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring to the function

rms_err: float
The error on the fractional rms amplitude.

"""
good = (self.freq >= min_freq) & (self.freq <= max_freq)

M_freq = self.m
K_freq = self.k

if isinstance(self.k, Iterable):
K_freq = self.k[good]

if isinstance(self.m, Iterable):
M_freq = self.m[good]

if poisson_noise_level is None:
poisson_noise_unnorm = poisson_level("none", n_ph=self.nphots)
else:
poisson_noise_unnorm = unnormalize_periodograms(
poisson_noise_level, self.dt, self.n, self.nphots, norm=self.norm
)

rms, rmse = get_rms_from_unnorm_periodogram(
self.unnorm_power[good],
self.nphots,
self.df * K_freq,
M=M_freq,
poisson_noise_unnorm=poisson_noise_unnorm,
segment_size=None,
kind="frac",
)

return rms, rmse

def _rms_error(self, powers):
r"""
Compute the error on the fractional rms amplitude using error
propagation.
Note: this uses the actual measured powers, which is not
strictly correct. We should be using the underlying power spectrum,
but in the absence of an estimate of that, this will have to do.

.. math::

r = \sqrt{P}

.. math::

\delta r = \\frac{1}{2 * \sqrt{P}} \delta P

Parameters
----------
powers: iterable
The list of powers used to compute the fractional rms amplitude.

Returns
-------
delta_rms: float
The error on the fractional rms amplitude.
"""
nphots = self.nphots
p_err = scipy.stats.chi2(2.0 * self.m).var() * powers / self.m / nphots

rms = np.sum(powers) / nphots
pow = np.sqrt(rms)

drms_dp = 1 / (2 * pow)

sq_sum_err = np.sqrt(np.sum(p_err**2))
delta_rms = sq_sum_err * drms_dp
return delta_rms


class LombScarglePowerspectrum(LombScargleCrossspectrum):
type = "powerspectrum"
Expand Down
65 changes: 60 additions & 5 deletions stingray/tests/test_lombscargle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pytest
from scipy.interpolate import interp1d
from astropy.modeling.models import Lorentz1D

from stingray.events import EventList
from stingray.exceptions import StingrayError
Expand All @@ -11,6 +12,8 @@
from stingray.lombscargle import _autofrequency
from stingray.simulator import Simulator

rng = np.random.RandomState(20150907)


def test_autofrequency():
freqs = _autofrequency(min_freq=0.1, max_freq=0.5, df=0.1)
Expand Down Expand Up @@ -42,15 +45,15 @@ def setup_class(self):
t = lc1.time
self.time = lc1.time
t_new = t.copy()
t_new[1:-1] = t[1:-1] + (np.random.rand(len(t) - 2) / (high - low))
t_new[1:-1] = t[1:-1] + (rng.rand(len(t) - 2) / (high - low))
s1_new = interp1d(t, s1, fill_value="extrapolate")(t_new)
s2_new = interp1d(t, s2, fill_value="extrapolate")(t_new)
self.lc1 = Lightcurve(t, s1_new, dt=lc1.dt)
self.lc2 = Lightcurve(t, s2_new, dt=lc2.dt)
self.lscs = LombScargleCrossspectrum(lc1, lc2)

def test_eventlist(self):
counts = np.random.poisson(10, 1000)
counts = rng.poisson(10, 1000)
times = np.arange(0, 1000, 1)
lc1 = Lightcurve(times, counts, dt=1)
lc2 = Lightcurve(times, counts, dt=1)
Expand Down Expand Up @@ -225,7 +228,7 @@ def test_time_phase_lag(self, phase_lag):
def func(time, phase=0):
return 2 + np.sin(2 * np.pi * (time * freq - phase))

time = np.sort(np.random.uniform(0, 100, 3000))
time = np.sort(rng.uniform(0, 100, 3000))

with pytest.warns(UserWarning):
lc1 = Lightcurve(time, func(time, 0))
Expand All @@ -249,7 +252,7 @@ def setup_class(self):
s1 = lc.counts
t = lc.time
t_new = t.copy()
t_new[1:-1] = t[1:-1] + (np.random.rand(len(t) - 2) / (high - low))
t_new[1:-1] = t[1:-1] + (rng.rand(len(t) - 2) / (high - low))
s_new = interp1d(t, s1, fill_value="extrapolate")(t_new)
self.lc = Lightcurve(t, s_new, dt=lc.dt)

Expand Down Expand Up @@ -279,8 +282,60 @@ def test_make_empty_powerspectrum(self):
assert ps.method is None

def test_ps_real(self):
counts = np.random.poisson(10, 1000)
counts = rng.poisson(10, 1000)
times = np.arange(0, 1000, 1)
lc = Lightcurve(times, counts, dt=1)
ps = LombScarglePowerspectrum(lc)
assert np.allclose(ps.power.imag, np.zeros_like(ps.power.imag), atol=1e-4)


class TestRMS(object):
@classmethod
def setup_class(cls):
fwhm = 0.23456
cls.segment_size = 256
cls.df = 1 / cls.segment_size

cls.freqs = np.arange(cls.df, 1, cls.df)
dt = 0.5 / cls.freqs.max()

pds_shape_func = Lorentz1D(x_0=0, fwhm=fwhm)
cls.pds_shape_raw = pds_shape_func(cls.freqs)
cls.M = 1
cls.nphots = 1_000_000
cls.rms = 0.5
meanrate = cls.nphots / cls.segment_size
cls.poisson_noise_rms = 2 / meanrate
pds_shape_rms = cls.pds_shape_raw / np.sum(cls.pds_shape_raw * cls.df) * cls.rms**2
pds_shape_rms += cls.poisson_noise_rms

random_part = rng.chisquare(2 * cls.M, size=cls.pds_shape_raw.size) / 2 / cls.M
pds_rms_noisy = random_part * pds_shape_rms

pds_unnorm = pds_rms_noisy * meanrate / 2 * cls.nphots
cls.pds = LombScarglePowerspectrum()
cls.pds.freq = cls.freqs
cls.pds.unnorm_power = pds_unnorm
cls.pds.power = pds_rms_noisy
cls.pds.df = cls.df
cls.pds.m = cls.M
cls.pds.nphots = cls.nphots
cls.pds.norm = "frac"
cls.pds.dt = dt
cls.pds.n = cls.pds.freq.size

@pytest.mark.parametrize("norm", ["none", "frac", "leahy", "abs"])
def test_rms(self, norm):
pds = self.pds.to_norm(norm)
with pytest.warns(UserWarning, match="All power spectral bins have M<30."):
rms_from_ps, rmse_from_ps = pds.compute_rms(self.freqs.min(), self.freqs.max())
assert np.isclose(rms_from_ps, self.rms, atol=3 * rmse_from_ps)

@pytest.mark.parametrize("norm", ["none", "frac", "leahy", "abs"])
def test_rms_rebinning(self, norm):
pds = self.pds.to_norm(norm)
pds = pds.rebin_log(0.04)
with pytest.warns(UserWarning, match="All power spectral bins have M<30."):
rms_from_ps, rmse_from_ps = pds.compute_rms(self.freqs.min(), self.freqs.max())

assert np.isclose(rms_from_ps, self.rms, atol=3 * rmse_from_ps)
Loading