from typing import Optional, Tuple, Union, cast
import numpy as np
import numpy.typing as npt
import healpy as hp
from scipy.interpolate import interpn
from scipy.sparse import csr_array
from scipy.special import legendre_p_all, sph_harm_y_all, gammaln
from numpy.polynomial.laguerre import laggauss
from .distribution import _log10pi
from .distribution import _norm, _unnormalized_pdf, _unnormalized_cdf
from .utils import _build_marginalized_grid, angular_distance
[docs]
class KingPDF:
"""
Calculate the probability density function (PDF) and cumulative distribution
function (CDF) for the King spatial distribution.
This class manages PDF and CDF evaluations with support for angular cutoffs
and proper normalization over the sphere.
Parameters
----------
angular_cutoff : float, optional
Maximum angular separation in radians. Default is pi (full sphere).
"""
[docs]
def __init__(
self,
*,
angular_cutoff: float = np.pi,
) -> None:
self.angular_cutoff = angular_cutoff
[docs]
def norm(
self,
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
) -> Union[float, npt.NDArray[np.floating]]:
"""
Compute the normalization constant for given King parameters.
Parameters
----------
alpha : float or ndarray
King distribution alpha parameter (scale) in radians.
beta : float or ndarray
King distribution beta parameter (tail weight).
Returns
-------
float or ndarray
Normalization constant(s) such that PDF integrates to 1.
"""
return _norm(alpha, beta, self.angular_cutoff)
[docs]
def pdf_from_norm(
self,
x: Union[float, npt.NDArray[np.floating]],
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
norm: Union[float, npt.NDArray[np.floating]],
) -> Union[float, npt.NDArray[np.floating]]:
"""
Evaluate the King kernel given a precomputed normalization constant.
Computes ``norm * unnormalized_pdf(x, alpha, beta)`` directly. No
validation of alpha, beta, or x is performed; the caller is
responsible for ensuring inputs are in-range.
Parameters
----------
x : float or ndarray
Angular separation(s) in radians. Must already be
``<= self.angular_cutoff``; this is NOT checked.
alpha : float or ndarray
King distribution alpha parameter (scale) in radians. Must
already be > 0; this is NOT checked.
beta : float or ndarray
King distribution beta parameter (tail weight). Must already
be > 1; this is NOT checked.
norm : float or ndarray
Precomputed normalization constant(s), e.g. from :meth:`norm`.
Returns
-------
float or ndarray
``norm * unnormalized_pdf(x, alpha, beta)``, broadcast over
inputs.
"""
return norm * _unnormalized_pdf(x, alpha, beta)
[docs]
def pdf(
self,
x: Union[float, npt.NDArray[np.floating]],
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
) -> Union[float, npt.NDArray[np.floating]]:
"""
Evaluate the normalized King PDF at given angular separation(s).
Returns zero for points beyond the angular cutoff. Handles broadcasting
of input arrays and masks invalid regions.
Parameters
----------
x : float or ndarray
Angular separation(s) from the source in radians.
alpha : float or ndarray
King distribution alpha parameter (scale) in radians.
beta : float or ndarray
King distribution beta parameter (tail weight).
Returns
-------
ndarray
Normalized PDF value(s) with units of probability/steradian.
"""
# Scalar-like: check if we can shortcut using the angular cutoff.
if np.isscalar(x) and (x > self.angular_cutoff): # type: ignore[operator]
return 0
elif isinstance(x, np.ndarray) and x.size == 1:
if float(x.flat[0]) > self.angular_cutoff:
return 0
if np.any(alpha <= 0):
raise ValueError("Received alpha <= 0. The King distribution is not defined here.")
if np.any(beta <= 1):
raise ValueError("Received beta <= 1. The King distribution is not defined here.")
# Broadcast
x, alpha, beta = np.broadcast_arrays(x, alpha, beta)
# And mask
normalized_pdf = np.zeros_like(x)
mask = x <= self.angular_cutoff
x, alpha, beta = x[mask], alpha[mask], beta[mask]
# Nope. Do the calculations.
norm = self.norm(alpha, beta)
unnormalized_pdf = _unnormalized_pdf(x, alpha, beta)
normalized_pdf[mask] = norm * unnormalized_pdf
return normalized_pdf
[docs]
def cdf(
self,
x: Union[float, npt.NDArray[np.floating]],
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
) -> Union[float, npt.NDArray[np.floating]]:
"""
Evaluate the normalized King CDF at given angular separation(s).
Returns 1 for points beyond the angular cutoff. Handles broadcasting
of input arrays and masks invalid regions.
Parameters
----------
x : float or ndarray
Angular separation(s) from the source in radians.
alpha : float or ndarray
King distribution alpha parameter (scale) in radians.
beta : float or ndarray
King distribution beta parameter (tail weight).
Returns
-------
ndarray
Normalized CDF value(s) (cumulative probability).
"""
# Scalar-like: check if we can shortcut using the angular cutoff.
if np.isscalar(x):
if x > self.angular_cutoff: # type: ignore[operator]
return 1
elif isinstance(x, np.ndarray) and x.size == 1:
if float(x.flat[0]) > self.angular_cutoff:
return 1
if np.any(alpha <= 0):
raise ValueError(
"Received alpha <= 0. The King distribution is onlydefined for alpha > 0."
)
if np.any(beta <= 1):
raise ValueError(
"Received beta <= 1. The King distribution is onlydefined for beta > 1."
)
# Broadcast
x, alpha, beta = np.broadcast_arrays(x, alpha, beta)
# And mask
normalized_cdf = np.ones_like(x)
mask = x <= self.angular_cutoff
x, alpha, beta = x[mask], alpha[mask], beta[mask]
# Nope. Do the calculations.
norm = self.norm(alpha, beta)
unnormalized_cdf = _unnormalized_cdf(x, alpha, beta)
normalized_cdf[mask] = norm * unnormalized_cdf
return normalized_cdf
[docs]
def sample(
self,
n: int,
alpha: float,
beta: float,
rng: Optional[np.random.Generator] = None,
n_grid: int = 10000,
) -> npt.NDArray[np.floating]:
"""
Sample angular separations from the King distribution via inverse CDF.
Parameters
----------
n : int
Number of samples to draw.
alpha : float
King distribution alpha parameter (scale) in radians.
beta : float
King distribution beta parameter (tail weight).
rng : np.random.Generator, optional
Random number generator. If None, uses np.random.default_rng().
n_grid : int, optional
Number of points in the CDF lookup grid. Higher values give more
accurate sampling at the cost of memory and setup time. Default
is 10000, which gives ~arcminute accuracy.
Returns
-------
ndarray
Angular separations in radians, shape (n,).
"""
if np.any(alpha <= 0):
raise ValueError("Received alpha <= 0. The PDF is not defined here.")
if np.any(beta <= 1):
raise ValueError("Received beta <= 1. The PDF is not defined here.")
if rng is None:
rng = np.random.default_rng()
psi_grid = np.linspace(1e-6, self.angular_cutoff, n_grid)
cdf_grid = cast(npt.NDArray[np.floating], self.cdf(psi_grid, alpha, beta))
return np.interp(rng.uniform(0, cdf_grid[-1], n), cdf_grid, psi_grid)
[docs]
def evaluate(
self,
source_ras: npt.NDArray[np.floating],
source_decs: npt.NDArray[np.floating],
event_ras: npt.NDArray[np.floating],
event_decs: npt.NDArray[np.floating],
alpha: npt.NDArray[np.floating],
beta: npt.NDArray[np.floating],
*,
mask: Optional[csr_array] = None,
) -> csr_array:
"""
Evaluate the King PDF for all (event, source) pairs and return a sparse matrix.
On the first call, identifies pairs within ``angular_cutoff`` via a
declination pre-filter and a full great-circle distance check, then
evaluates the King PDF for those pairs. On repeated calls with the same
event and source positions, pass the result of a previous call as
``mask`` to skip the masking step entirely and go straight to vectorized
PDF evaluation using the cached sparsity pattern.
Parameters
----------
source_ras : ndarray, shape (n_sources,)
Source right ascensions in radians.
source_decs : ndarray, shape (n_sources,)
Source declinations in radians.
event_ras : ndarray, shape (n_events,)
Reconstructed event right ascensions in radians.
event_decs : ndarray, shape (n_events,)
Reconstructed event declinations in radians.
alpha : ndarray, shape (n_events,)
Per-event King alpha parameter in radians.
beta : ndarray, shape (n_events,)
Per-event King beta parameter.
mask : csr_array, optional
Sparse array whose nonzero structure encodes the valid
(event, source) pairs. When provided, the angular-distance loop
is skipped and only the indexed pairs are evaluated. Pass the
result of a previous :meth:`evaluate` call to reuse the geometry.
Returns
-------
csr_array, shape (n_events, n_sources)
Sparse array of PDF values, indexed ``[event_index, source_index]``.
"""
source_ras = np.atleast_1d(np.asarray(source_ras, dtype=np.float64))
source_decs = np.atleast_1d(np.asarray(source_decs, dtype=np.float64))
event_ras = np.asarray(event_ras, dtype=np.float64)
event_decs = np.asarray(event_decs, dtype=np.float64)
alpha = np.asarray(alpha, dtype=np.float64)
beta = np.asarray(beta, dtype=np.float64)
n_events = len(event_ras)
n_sources = len(source_ras)
if mask is not None:
rows, cols = mask.nonzero()
psi = angular_distance(
source_ras[cols],
source_decs[cols],
event_ras[rows],
event_decs[rows],
)
vals = np.asarray(
self.pdf(psi, alpha[rows], beta[rows]),
dtype=np.float64,
)
nonzero = vals > 0.0
return csr_array(
(vals[nonzero], (rows[nonzero], cols[nonzero])),
shape=(n_events, n_sources),
dtype=np.float64,
)
row_chunks, col_chunks, val_chunks = [], [], []
for source_index, (src_ra, src_dec) in enumerate(zip(source_ras, source_decs)):
dec_mask = np.abs(event_decs - src_dec) <= self.angular_cutoff
candidate_indices = np.flatnonzero(dec_mask)
if len(candidate_indices) == 0:
continue
psi = angular_distance(
src_ra,
src_dec,
event_ras[candidate_indices],
event_decs[candidate_indices],
)
within_cutoff = psi <= self.angular_cutoff
event_indices = candidate_indices[within_cutoff]
if len(event_indices) == 0:
continue
vals = np.asarray(
self.pdf(psi[within_cutoff], alpha[event_indices], beta[event_indices]),
dtype=np.float64,
)
nonzero = vals > 0.0
row_chunks.append(event_indices[nonzero])
col_chunks.append(np.full(int(nonzero.sum()), source_index, dtype=np.intp))
val_chunks.append(vals[nonzero])
if not row_chunks:
return csr_array((n_events, n_sources), dtype=np.float64)
return csr_array(
(
np.concatenate(val_chunks),
(np.concatenate(row_chunks), np.concatenate(col_chunks)),
),
shape=(n_events, n_sources),
dtype=np.float64,
)
class MarginalizedKingPDF:
"""
King PDF pre-integrated over right ascension for signal-subtraction likelihoods.
Pre-computes a four-dimensional grid of RA-marginalized King PDF values over
(source_declination, log10(alpha), beta, dec_reco - source_declination) at
construction and evaluates it via trilinear interpolation at runtime.
:meth:`evaluate` returns a :class:`scipy.sparse.csr_array` of shape
``(n_events, n_sources)`` whose nonzero entries are the marginalized PDF
values for event–source pairs within ``angular_cutoff`` of each other.
A :class:`~kingmaker.pdf.KingPDF` instance is accessible via :attr:`king`
for sampling, CDF evaluation, and similar point-source operations.
Parameters
----------
source_declination : array-like
True source declination(s) in radians. **Required.**
angular_cutoff : float, optional
Maximum angular separation in radians. The RA integral is zero for
events farther than this from a source. Default is pi.
points_alpha : ndarray, optional
Alpha grid points in radians. Default: 30 log-spaced values from
0.05 degrees to pi.
points_beta : ndarray, optional
Beta grid points. Default: 20 log-spaced values from ~1.023 to 10.
Lower bound is kept above 1 to avoid float64 precision loss in the
normalization at beta -> 1.
n_signed_delta_dec : int, optional
Number of grid points in the signed declination-offset axis. Default: 200.
n_ra_bins : int, optional
Number of RA integration intervals over [0, pi]. Default: 100.
"""
def __init__(
self,
*,
source_declination: Union[list, npt.NDArray[np.floating]],
angular_cutoff: float = np.pi,
points_alpha: Optional[npt.NDArray[np.floating]] = None,
points_beta: Optional[npt.NDArray[np.floating]] = None,
n_signed_delta_dec: int = 200,
n_ra_bins: int = 100,
) -> None:
self.king = KingPDF(angular_cutoff=angular_cutoff)
self.angular_cutoff = angular_cutoff
self.source_declination = np.sort(
np.atleast_1d(np.asarray(source_declination, dtype=np.float64))
)
self._points_alpha = np.sort(
np.asarray(
points_alpha
if points_alpha is not None
else np.logspace(np.log10(np.radians(0.05)), _log10pi, 30),
dtype=np.float64,
)
)
self._points_beta = np.sort(
np.asarray(
# Lower bound is kept away from 1.0 to avoid float64 precision
# loss in _norm() at beta -> 1, which otherwise produces inf.
points_beta if points_beta is not None else np.logspace(0.01, 1, 20),
dtype=np.float64,
)
)
self._n_signed_delta_dec = n_signed_delta_dec
self._n_ra_bins = n_ra_bins
if np.any(self._points_alpha <= 0):
raise ValueError(
"points_alpha contains values <= 0. The King PDF is not defined in this region."
)
if np.any(self._points_beta <= 1):
raise ValueError(
"points_beta contains values <= 1. The King PDF is not defined in this region."
)
self._build_cache()
def _build_cache(self) -> None:
"""
Build the precomputed RA-marginalized PDF grid.
Fills a 4D grid over (source_declination, log10(alpha), beta,
signed_delta_dec), where signed_delta_dec = dec_reco - source_declination.
The grid and its coordinate axes are stored on the instance for use
by :meth:`pdf`.
"""
self._log10_points_alpha = np.log10(self._points_alpha)
# signed_delta_dec spans [-angular_cutoff, +angular_cutoff]; the PDF
# is zero outside this range regardless of source declination.
self._signed_delta_dec = np.linspace(
-self.angular_cutoff, self.angular_cutoff, self._n_signed_delta_dec
)
# RA integration nodes on [0, pi] (see _marginalize_ra for the
# symmetry argument that limits integration to this half).
ra_grid = np.linspace(0.0, np.pi, self._n_ra_bins + 1)
# Normalization depends only on (alpha, beta), not on source declination,
# so it's computed once here instead of inside the per-(dec, alpha, beta)
# numba loop.
alpha_mg, beta_mg = np.meshgrid(self._points_alpha, self._points_beta, indexing="ij")
norm_grid = cast(npt.NDArray[np.floating], self.king.norm(alpha_mg, beta_mg))
self._grid = _build_marginalized_grid(
self.source_declination,
self._points_alpha,
self._points_beta,
norm_grid.astype(np.float64),
float(self.angular_cutoff),
self._signed_delta_dec,
ra_grid,
)
# self._grid has shape (n_sources, n_alpha, n_beta, n_signed_delta_dec)
def pdf(
self,
x: Union[float, npt.NDArray[np.floating]],
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
source_dec: float,
) -> npt.NDArray[np.floating]:
"""
Evaluate the RA-marginalized King PDF for one source at given event declinations.
Mirrors :meth:`KingPDF.pdf` for the marginalized distribution: ``x`` is
the reconstructed event declination (not angular separation) and
``source_dec`` specifies the single source position. Returns a dense
array. Use :meth:`evaluate` for efficient batch evaluation over many
sources.
Parameters
----------
x : float or ndarray, shape (n_events,)
Reconstructed event declination(s) in radians.
alpha : float or ndarray, shape (n_events,)
Per-event King alpha parameter in radians.
beta : float or ndarray, shape (n_events,)
Per-event King beta parameter.
source_dec : float
True source declination in radians.
Returns
-------
ndarray, shape (n_events,)
Marginalized PDF values in rad⁻¹. Zero for events farther than
``angular_cutoff`` from ``source_dec`` in declination.
"""
x, alpha, beta = np.broadcast_arrays(
np.asarray(x, dtype=np.float64),
np.asarray(alpha, dtype=np.float64),
np.asarray(beta, dtype=np.float64),
)
x = np.atleast_1d(x)
alpha = np.atleast_1d(alpha)
beta = np.atleast_1d(beta)
signed_delta_dec = x - float(source_dec)
within = np.abs(signed_delta_dec) <= self.angular_cutoff
result = np.zeros(len(x), dtype=np.float64)
if not np.any(within):
return result
queries = np.column_stack(
[
np.full(int(within.sum()), float(source_dec)),
np.log10(alpha[within]),
beta[within],
signed_delta_dec[within],
]
)
result[within] = interpn(
(
self.source_declination,
self._log10_points_alpha,
self._points_beta,
self._signed_delta_dec,
),
self._grid,
queries,
method="linear",
bounds_error=False,
fill_value=0.0,
)
return result
def evaluate(
self,
source_decs: npt.NDArray[np.floating],
event_decs: npt.NDArray[np.floating],
alpha: npt.NDArray[np.floating],
beta: npt.NDArray[np.floating],
*,
mask: Optional[csr_array] = None,
) -> csr_array:
"""
Evaluate the RA-marginalized King PDF for every (event, source) pair.
Looks up values from the precomputed grid via trilinear interpolation
over (log10(alpha), beta, signed_delta_dec) at each source's declination,
where signed_delta_dec = event_dec - source_dec. On the first call,
identifies pairs within ``angular_cutoff`` via a declination offset
check. On repeated calls with the same event and source positions, pass
the result of a previous call as ``mask`` to skip the masking loop
entirely and go straight to a single vectorized interpolation.
Parameters
----------
source_decs : ndarray, shape (n_sources,)
True source declination(s) in radians.
event_decs : ndarray, shape (n_events,)
Reconstructed event declination(s) in radians.
alpha : ndarray, shape (n_events,)
Per-event King alpha parameter in radians.
beta : ndarray, shape (n_events,)
Per-event King beta parameter.
mask : csr_array, optional
Sparse array whose nonzero structure encodes the valid
(event, source) pairs. When provided, the masking loop is skipped
and only the indexed pairs are evaluated. Pass the result of a
previous :meth:`evaluate` call to reuse the geometry.
Returns
-------
csr_array, shape (n_events, n_sources)
Sparse array of marginalized PDF values, indexed
``[event_index, source_index]``.
"""
source_decs = np.atleast_1d(np.asarray(source_decs, dtype=np.float64))
event_decs = np.asarray(event_decs, dtype=np.float64)
alpha = np.asarray(alpha, dtype=np.float64)
beta = np.asarray(beta, dtype=np.float64)
n_events = len(event_decs)
n_sources = len(source_decs)
if mask is not None:
rows, cols = mask.nonzero()
signed_delta_dec = event_decs[rows] - source_decs[cols]
queries = np.column_stack(
[
source_decs[cols],
np.log10(alpha[rows]),
beta[rows],
signed_delta_dec,
]
)
values = interpn(
(
self.source_declination,
self._log10_points_alpha,
self._points_beta,
self._signed_delta_dec,
),
self._grid,
queries,
method="linear",
bounds_error=False,
fill_value=0.0,
)
return csr_array(
(values, (rows, cols)),
shape=(n_events, n_sources),
dtype=np.float64,
)
row_chunks, col_chunks, query_chunks = [], [], []
for source_index, source_dec in enumerate(source_decs):
signed_delta_dec = event_decs - source_dec
event_indices = np.flatnonzero(np.abs(signed_delta_dec) <= self.angular_cutoff)
if len(event_indices) == 0:
continue
row_chunks.append(event_indices)
col_chunks.append(np.full(len(event_indices), source_index, dtype=np.intp))
query_chunks.append(
np.column_stack(
[
np.full(len(event_indices), source_dec),
np.log10(alpha[event_indices]),
beta[event_indices],
signed_delta_dec[event_indices],
]
)
)
if not query_chunks:
return csr_array((n_events, n_sources), dtype=np.float64)
rows = np.concatenate(row_chunks)
cols = np.concatenate(col_chunks)
queries = np.vstack(query_chunks)
values = interpn(
(
self.source_declination,
self._log10_points_alpha,
self._points_beta,
self._signed_delta_dec,
),
self._grid,
queries,
method="linear",
bounds_error=False,
fill_value=0.0,
)
nonzero = values > 0.0
return csr_array(
(values[nonzero], (rows[nonzero], cols[nonzero])),
shape=(n_events, n_sources),
dtype=np.float64,
)
[docs]
class TemplateSmearedKingPDF(KingPDF):
"""
King PDF convolved with a HEALPix template map using spherical harmonics.
Uses spherical harmonic decomposition to efficiently convolve a King PSF
with a template skymap. Pre-computes template harmonics for fast evaluation.
Parameters
----------
skymap : ndarray
HEALPix map to convolve with King PDF. Will be normalized to integrate to 1.
eval_decs : float or ndarray
Declination(s) in radians where PDF will be evaluated.
eval_ras : float or ndarray
Right ascension(s) in radians where PDF will be evaluated.
angular_cutoff : float, optional
Maximum angular separation in radians. Default is pi.
points_alpha : ndarray, optional
Grid of alpha values for normalization interpolation.
points_beta : ndarray, optional
Grid of beta values for normalization interpolation.
lmax : int, optional
Maximum spherical harmonic degree. Default is 3*nside-1.
interpolation_method : {"nearest", "linear"}, optional
Method used in get_king_b_l to look up b_l coefficients from the
precomputed grid. "nearest" (default) snaps to the closest grid point,
returning one unique b_l vector per distinct grid cell limiting the
number of maps generated and improving efficiency. The "linear" option
bilinearly interpolates in log(alpha), log(beta) space.
memory_limit_gb : float, optional
Memory budget in GB for the sph_harm_y_all array allocated in
set_coordinates. Points are processed in batches sized so that each
batch stays within this limit. Default is 1.0 GB. At nside=256
(lmax=767) each point costs ~9.4 MB, so the default allows ~100 points
per batch; at nside=512 (lmax=1535) each point costs ~37.7 MB, so the
default allows ~26 points per batch.
"""
skymap: npt.NDArray[np.floating]
bl_grid: npt.NDArray[np.floating]
interpolation_method: str
eval_decs: npt.NDArray[np.floating] = np.array([], dtype=np.float32)
eval_ras: npt.NDArray[np.floating] = np.array([], dtype=np.float32)
[docs]
def __init__(
self,
skymap: npt.NDArray[np.floating],
*,
eval_decs: Optional[Union[float, npt.NDArray[np.floating]]] = None,
eval_ras: Optional[Union[float, npt.NDArray[np.floating]]] = None,
angular_cutoff: float = np.pi,
points_alpha: npt.NDArray[np.floating] = np.logspace(-4, _log10pi + 1e-2, 100),
points_beta: npt.NDArray[np.floating] = np.nextafter(np.logspace(0, 1, 100), np.inf),
lmax: Optional[int] = None,
interpolation_method: str = "nearest",
memory_limit_gb: float = 1.0,
) -> None:
if interpolation_method not in ("nearest", "linear"):
raise ValueError(
f"interpolation_method must be 'nearest' or 'linear', got {interpolation_method!r}"
)
self.interpolation_method = interpolation_method
self.memory_limit_bytes = int(memory_limit_gb * 1e9)
super().__init__(angular_cutoff=angular_cutoff)
if np.any(points_alpha <= 0):
raise ValueError(
"Received points_alpha containing at least one point <= 0. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 0.0 when passing them into TemplateSmearedKingPDF."
)
if np.any(points_beta <= 1):
raise ValueError(
"Received points_beta containing at least one point <= 1. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 1.0 when passing them into TemplateSmearedKingPDF."
)
self.points_alpha = points_alpha
self.points_beta = points_beta
self.log10_points_alpha = np.log10(self.points_alpha)
self.log10_points_beta = np.log10(self.points_beta)
self.nside = hp.npix2nside(len(skymap))
self.lmax = (3 * self.nside - 1) if lmax is None else lmax
self.mmax = self.lmax
# While we're here, normalize the skymap so it integrates to 1.
# Then calculate the alm coefficients needed for convolution.
self.skymap = skymap
normalized_skymap = skymap / skymap.sum() / hp.nside2pixarea(self.nside)
self.skymap_alm = hp.map2alm(normalized_skymap, lmax=self.lmax, mmax=self.mmax, iter=1)
# Precompute the spherical harmonic indices needed for convolution.
# hp.Alm.getlm builds (ls, ms) in C, avoiding a Python loop over lmax+1.
self.ls, self.ms = hp.Alm.getlm(self.lmax)
self._alm_indices = hp.Alm.getidx(self.lmax, self.ls, self.ms)
# Precompute weighted alm = factors * a_lm once at init so set_coordinates
# doesn't recompute them per call. Sort by l so np.add.reduceat can sum
# contributions per degree without Python-level scatter (np.add.at).
a_lm_flat = self.skymap_alm[self._alm_indices]
factors = np.where(self.ms == 0, 1.0, 2.0)
weighted_alm = factors * a_lm_flat
sort_idx = np.argsort(self.ls, kind="stable")
self.ls_sorted = self.ls[sort_idx]
self.ms_sorted = self.ms[sort_idx]
self.weighted_alm_sorted = weighted_alm[sort_idx]
self.l_starts = np.searchsorted(self.ls_sorted, np.arange(self.lmax + 1))
# Set up the evaluation coordinates
if eval_decs is not None and eval_ras is not None:
self.set_coordinates(eval_decs, eval_ras)
# Pre-generate the Legendre polynomials needed for b_l calculations.
# The theta grid is log-spaced so it resolves the PSF core accurately
# down to ~0.0057 degrees (1e-4 rad), well below IceCube's resolution.
self.theta_grid = np.concatenate(
[[0.0], np.logspace(-4, np.log10(self.angular_cutoff), 1000)]
)
P_all = legendre_p_all(self.lmax, np.cos(self.theta_grid))[0]
self.P_all = P_all[: self.lmax + 1]
self.bl_grid = self.precompute_bl_grid()
[docs]
def set_coordinates(
self,
eval_decs: Union[float, npt.NDArray[np.floating]],
eval_ras: Union[float, npt.NDArray[np.floating]],
) -> None:
"""
Set evaluation coordinates and pre-compute spherical harmonics.
Parameters
----------
eval_decs : float or ndarray
Declination(s) in radians.
eval_ras : float or ndarray
Right ascension(s) in radians.
"""
eval_decs = np.atleast_1d(eval_decs)
eval_ras = np.atleast_1d(eval_ras)
# If there's a different number of declination and RA points,
# there's something wrong.
if np.atleast_1d(eval_decs).shape != eval_ras.shape:
raise RuntimeError(
"TemplateSmearedKingPDF::set_coordinates received different numbers"
f" of declination values ({np.atleast_1d(eval_decs).shape}) and"
f" right ascension values ({np.atleast_1d(eval_ras).shape})."
" These need to match since each is assumed to be one source."
)
# If the coordinates match what we already have, do nothing.
if (eval_decs.size == 0) or (
np.all(np.equal(self.eval_decs.shape, eval_decs.shape))
and np.all(np.equal(self.eval_decs, eval_decs))
and np.all(np.equal(self.eval_ras, eval_ras))
):
return
self.eval_decs = np.atleast_1d(eval_decs)
self.eval_ras = np.atleast_1d(eval_ras)
assert self.eval_decs.shape == self.eval_ras.shape
npts = len(self.eval_decs)
# sph_harm_y_all returns shape (lmax+1, 2*mmax+1, batch) in complex128.
# Batch over points so the raw array stays within memory_limit_bytes.
bytes_per_point = np.complex128().nbytes * (self.lmax + 1) * (2 * self.mmax + 1)
batch_size = max(1, self.memory_limit_bytes // bytes_per_point)
self._c_l = np.zeros((self.lmax + 1, npts), dtype=np.float64)
for start in range(0, npts, batch_size):
end = min(start + batch_size, npts)
raw = sph_harm_y_all(
self.lmax,
self.mmax,
np.pi / 2 - self.eval_decs[start:end],
self.eval_ras[start:end],
)
# Sum contributions per degree using reduceat over l-sorted alm order.
Y_lm_sorted = raw[self.ls_sorted, self.ms_sorted, :] # (nalm, batch)
contribs = np.real(self.weighted_alm_sorted[:, None] * Y_lm_sorted) # (nalm, batch)
self._c_l[:, start:end] = np.add.reduceat(contribs, self.l_starts, axis=0)
return
[docs]
def skymap_to_alm(self) -> npt.NDArray[np.complexfloating]:
"""
Convert the HEALPix skymap to spherical harmonic coefficients.
Returns
-------
ndarray
Complex spherical harmonic coefficients a_lm.
"""
return hp.map2alm(self.skymap, lmax=self.lmax, mmax=self.mmax)
[docs]
def precompute_bl_grid(self) -> npt.NDArray[np.floating]:
"""
Precompute b_l coefficients for all (alpha, beta) grid points via matmul.
Evaluates the King PDF over the full (n_alpha, n_beta, n_theta) parameter
grid, then computes all b_l integrals in a single matrix multiply.
Peak memory scales as O(n_alpha * n_beta * n_theta).
Returns
-------
ndarray, shape (n_alpha, n_beta, lmax + 1)
Spherical harmonic b_l coefficients for each (alpha, beta) grid point,
stored in interpn-ready axis order.
"""
n_alpha = len(self.points_alpha)
n_beta = len(self.points_beta)
theta = self.theta_grid
n_theta = len(theta)
# Evaluate the King PDF over the full (n_alpha, n_beta, n_theta) grid.
# Compute normalisation constants once per (alpha, beta) pair and broadcast
# over theta, avoiding n_theta redundant norm lookups per grid point.
alpha_grid, beta_grid = np.meshgrid(self.points_alpha, self.points_beta, indexing="ij")
norms = self.norm(alpha_grid, beta_grid)
unnorm = _unnormalized_pdf(
theta[None, None, :], alpha_grid[:, :, None], beta_grid[:, :, None]
) # (n_alpha, n_beta, n_theta)
pdf_vals = norms[:, :, None] * unnorm
# Trapezoid quadrature weights for the non-uniform log-spaced theta grid.
w = np.empty(n_theta)
w[1:-1] = (theta[2:] - theta[:-2]) / 2
w[0] = (theta[1] - theta[0]) / 2
w[-1] = (theta[-1] - theta[-2]) / 2
# Absorb 2pi * sin(theta) * w into P_all once to form P_weighted, then
# use a single BLAS matmul instead of lmax+1 separate trapezoid calls.
P_weighted = self.P_all * (2 * np.pi * np.sin(theta) * w) # (lmax+1, n_theta)
pdf_flat = pdf_vals.reshape(n_alpha * n_beta, n_theta)
bl_flat = P_weighted @ pdf_flat.T # (lmax+1, n_alpha * n_beta)
# Store as (n_alpha, n_beta, lmax+1) so interpn can use it directly
# without a transpose on every get_king_b_l call.
return bl_flat.reshape(self.lmax + 1, n_alpha, n_beta).transpose(1, 2, 0)
[docs]
def get_king_b_l(self, alpha: float, beta: float) -> npt.NDArray[np.floating]:
"""
Return spherical harmonic expansion coefficients b_l for the King PDF.
Looks up b_l values from the precomputed grid (see precompute_bl_grid)
using the interpolation method selected at initialization.
"nearest" snaps to the closest (alpha, beta) grid point in log space,
so events that fall in the same grid cell reuse the same b_l vector
without triggering a new map convolution. "linear" bilinearly
interpolates in log(alpha), log(beta) space for smoother variation.
Parameters
----------
alpha : float
King distribution alpha parameter (scale) in radians.
beta : float
King distribution beta parameter (tail weight).
Returns
-------
ndarray
Spherical harmonic coefficients b_l for degrees 0 to lmax.
"""
if np.any(alpha <= 0):
raise ValueError(
"Received alpha containing at least one point <= 0. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 0.0 when passing them into TemplateSmearedKingPDF."
)
if np.any(beta <= 1):
raise ValueError(
"Received beta containing at least one point <= 1. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 1.0 when passing them into TemplateSmearedKingPDF."
)
log_a = np.log10(alpha)
log_b = np.log10(beta)
if self.interpolation_method == "nearest":
i = np.searchsorted(self.log10_points_alpha, log_a)
j = np.searchsorted(self.log10_points_beta, log_b)
i = np.clip(i, 1, len(self.log10_points_alpha) - 1)
j = np.clip(j, 1, len(self.log10_points_beta) - 1)
# Adjust i and j to snap to the nearest grid point in log space
# since searchsorted gives index of the right edge of the bin.
if log_a - self.log10_points_alpha[i - 1] < self.log10_points_alpha[i] - log_a:
i -= 1
if log_b - self.log10_points_beta[j - 1] < self.log10_points_beta[j] - log_b:
j -= 1
return self.bl_grid[i, j]
# "linear"
result = interpn(
(self.log10_points_alpha, self.log10_points_beta),
self.bl_grid, # (n_alpha, n_beta, lmax+1)
np.array([[log_a, log_b]]),
method="linear",
bounds_error=True,
)
return result[0]
[docs]
def convolve_map(self, alpha: float, beta: float) -> npt.NDArray[np.floating]:
"""
Convolve the template skymap with a King PSF and return full HEALPix map.
Parameters
----------
alpha : float
King distribution alpha parameter (scale) in radians.
beta : float
King distribution beta parameter (tail weight).
Returns
-------
ndarray
Convolved HEALPix skymap at the same resolution as input.
"""
if np.any(alpha <= 0):
raise ValueError(
"Received alpha containing at least one point <= 0. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 0.0 when passing them into TemplateSmearedKingPDF."
)
if np.any(beta <= 1):
raise ValueError(
"Received beta containing at least one point <= 1. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 1.0 when passing them into TemplateSmearedKingPDF."
)
b_l = self.get_king_b_l(alpha, beta)
harmonic_convolution = hp.almxfl(alm=self.skymap_alm, fl=b_l, mmax=self.mmax, inplace=False)
return hp.alm2map(harmonic_convolution, nside=self.nside, lmax=self.lmax, mmax=self.mmax)
[docs]
def convolve_at_grid_point(
self, alpha: float, beta: float
) -> Union[float, npt.NDArray[np.floating]]:
"""
Evaluate convolved PDF only at pre-set grid points (eval_decs, eval_ras).
Uses pre-computed spherical harmonics from set_coordinates().
Parameters
----------
alpha : float
King distribution alpha parameter (scale) in radians.
beta : float
King distribution beta parameter (tail weight).
Returns
-------
float or ndarray
Convolved PDF value(s) at evaluation coordinates.
"""
if np.any(alpha <= 0):
raise ValueError(
"Received alpha containing at least one point <= 0. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 0.0 when passing them into TemplateSmearedKingPDF."
)
if np.any(beta <= 1):
raise ValueError(
"Received beta containing at least one point <= 1. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 1.0 when passing them into TemplateSmearedKingPDF."
)
b_l = self.get_king_b_l(alpha, beta)
return b_l @ self._c_l
[docs]
def sample(
self,
n: int,
alpha: float,
beta: float,
rng: Optional[np.random.Generator] = None,
n_grid: int = 10000,
) -> Tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
"""
Sample reconstructed positions from the PSF-convolved template skymap.
Convolves the template with the King PSF in harmonic space, then draws
pixel indices weighted by the convolved map values. This is equivalent
to drawing a true position from the template and applying a King PSF
offset, but is more efficient because the convolution is done once for
the whole map rather than per-event.
Parameters
----------
n : int
Number of samples to draw.
alpha : float
King distribution alpha parameter (scale) in radians.
beta : float
King distribution beta parameter (tail weight).
rng : np.random.Generator, optional
Random number generator. If None, uses np.random.default_rng().
n_grid : int, optional
Unused. Sampling draws directly from pixel weights of the convolved
map.
Returns
-------
reco_ra : ndarray
Reconstructed right ascension values in radians, shape (n,).
reco_dec : ndarray
Reconstructed declination values in radians, shape (n,).
Notes
-----
Samples land at HEALPix pixel centres. The positional resolution is
therefore limited by the skymap pixelisation (~`hp.nside2resol(nside)`).
"""
if np.any(alpha <= 0):
raise ValueError(
"Received alpha containing at least one point <= 0. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 0.0 when passing them into TemplateSmearedKingPDF."
)
if np.any(beta <= 1):
raise ValueError(
"Received beta containing at least one point <= 1. The"
" KingPDF isn't defined in this region. Ensure your points are"
" all above 1.0 when passing them into TemplateSmearedKingPDF."
)
if rng is None:
rng = np.random.default_rng()
convolved = self.convolve_map(alpha, beta)
weights = np.maximum(convolved, 0.0)
pixel_indices = rng.choice(len(convolved), size=n, p=weights / weights.sum())
colatitude, longitude = hp.pix2ang(self.nside, pixel_indices)
return longitude, np.pi / 2 - colatitude # reco_ra, reco_dec
class ExtendedSourceKingPDF:
"""
King PSF convolved with a Rayleigh (Gaussian) source extension.
Precomputes a 4D lookup table of convolved PDF values over
(log10(alpha), log10(beta), log10(extension), psi) using Gauss-Laguerre
quadrature on the inverse-gamma scale mixture representation of the King
distribution. At runtime, evaluates each event via quadrilinear
interpolation into this table.
Both the King PSF and the Rayleigh extension are axially symmetric, so the
convolution depends only on the scalar angular distance psi between the
event and the source — not on the full sky coordinates of either.
Parameters
----------
angular_cutoff : float, optional
Maximum angular separation in radians. Default is pi.
maximum_sigma : float, optional
Number of source-extension radii beyond the King angular_cutoff
that ``evaluate()`` considers for each source. For a source with
extension r₀, events at angular distance greater than
``maximum_sigma * r₀ + angular_cutoff`` are skipped. Default is 3.
points_alpha : ndarray, optional
Alpha grid points in radians. Default: 30 log-spaced values from
0.05 degrees to pi.
points_beta : ndarray, optional
Beta grid points. Default: 20 log-spaced values from ~1.023 to 10.
points_extension : ndarray, optional
Extension radius grid points in radians. Default: 20 log-spaced values
from 0.05 degrees to 5 degrees.
points_psi : ndarray, optional
Angular separation grid points in radians. Default: 0 followed by 500
log-spaced values from 1e-4 rad to angular_cutoff.
n_quad : int, optional
Number of Gauss-Laguerre quadrature nodes for the scale mixture
integral. Default is 32.
"""
def __init__(
self,
*,
angular_cutoff: float = np.pi,
maximum_sigma: float = 3.0,
points_alpha: Optional[npt.NDArray[np.floating]] = None,
points_beta: Optional[npt.NDArray[np.floating]] = None,
points_extension: Optional[npt.NDArray[np.floating]] = None,
points_psi: Optional[npt.NDArray[np.floating]] = None,
n_quad: int = 32,
) -> None:
self.angular_cutoff = float(angular_cutoff)
self.maximum_sigma = float(maximum_sigma)
self.n_quad = n_quad
self._points_alpha = np.sort(
np.asarray(
points_alpha
if points_alpha is not None
else np.logspace(np.log10(np.radians(0.05)), _log10pi, 30),
dtype=np.float64,
)
)
self._points_beta = np.sort(
np.asarray(
points_beta if points_beta is not None else np.logspace(0.01, 1, 20),
dtype=np.float64,
)
)
self._points_extension = np.sort(
np.asarray(
points_extension
if points_extension is not None
else np.logspace(np.log10(np.radians(0.05)), np.log10(np.radians(5.0)), 20),
dtype=np.float64,
)
)
# The psi table must reach the widest possible search window so that
# interpn stays in-bounds for all events accepted by evaluate().
# That window is maximum_sigma * max_ext + angular_cutoff, capped at π.
_psi_max = min(
self.maximum_sigma * self._points_extension[-1] + angular_cutoff,
np.pi,
)
self._points_psi = np.sort(
np.asarray(
points_psi
if points_psi is not None
else np.concatenate([[0.0], np.logspace(-4, np.log10(_psi_max), 500)]),
dtype=np.float64,
)
)
if np.any(self._points_alpha <= 0):
raise ValueError(
"points_alpha contains values <= 0. The King distribution is not defined here."
)
if np.any(self._points_beta <= 1):
raise ValueError(
"points_beta contains values <= 1. The King distribution is not defined here."
)
if np.any(self._points_extension <= 0):
raise ValueError(
"points_extension contains values <= 0. Extension radius must be positive."
)
if np.any(self._points_extension > np.radians(5.0) + 1e-12):
raise ValueError(
"points_extension contains values > 5 degrees. The flat-sky (Rayleigh) "
"approximation error exceeds ~0.5% at this scale; use a vMF extension instead."
)
self._log10_points_alpha = np.log10(self._points_alpha)
self._log10_points_beta = np.log10(self._points_beta)
self._log10_points_extension = np.log10(self._points_extension)
# Gauss-Laguerre nodes and weights for the scale mixture integral.
# The substitution t = beta*alpha^2 / v^2 maps the InvGamma weight
# exp(-beta*alpha^2/v^2) to the standard Gauss-Laguerre form e^{-t}.
self._quad_nodes, self._quad_weights = laggauss(n_quad)
self._build_table()
def _scale_mixture_pdf(
self,
psi: npt.NDArray[np.floating],
alpha: npt.NDArray[np.floating],
beta: npt.NDArray[np.floating],
extension: float,
) -> npt.NDArray[np.floating]:
"""
Evaluate the King–Rayleigh convolution via Gauss-Laguerre quadrature.
Derivation
----------
Consider integrating a Rayleigh PDF over an unknown scale v^2, weighted
by an InvGamma(kappa, c) density:
integral_0^inf [psi/v^2 * exp(-psi^2/(2v^2))] <- Rayleigh
* [c^kappa/Gamma(kappa) * (v^2)^{-kappa-1} * exp(-c/v^2)]
dv^2
The two exponentials combine: exp(-psi^2/(2v^2)) * exp(-c/v^2)
= exp(-(psi^2/2 + c)/v^2). Collecting powers of v^2 gives an integrand
proportional to (v^2)^{-(kappa+2)} * exp(-A/v^2) with A = c + psi^2/2.
That integral is a standard gamma-function result: Gamma(kappa+1)/A^{kappa+1}.
Substituting back:
result = psi * kappa * c^kappa / (c + psi^2/2)^{kappa+1}
= psi/c * kappa / (1 + psi^2/(2c))^{kappa+1}
Matching to the flat-sky King PDF psi/alpha^2 * (1-1/beta)
* (1 + psi^2/(2*beta*alpha^2))^{-beta} requires kappa = beta-1 and
c = beta*alpha^2. The King distribution is therefore exactly equal
to an InvGamma-weighted integral of Rayleigh distributions.
For the convolution with a Rayleigh source extension of radius r0: the
event position is the vector sum of an independent PSF displacement
(Rayleigh with scale v^2) and a source-extent displacement (Rayleigh
with scale r0^2). Adding two independent 2D Gaussian displacements
produces a 2D Gaussian with combined scale v^2 + r0^2. Replacing v^2
with v^2 + r0^2 inside the integral above gives the convolution.
The substitution t = c/v^2 (so v^2 = c/t) maps exp(-c/v^2) to exp(-t),
putting the integral in standard Gauss-Laguerre form
integral_0^inf f(t) * e^{-t} dt, evaluated here with fixed nodes and
weights from laggauss(n_quad). After the substitution the integral
becomes:
p_conv = psi / Gamma(beta-1)
* integral_0^inf t^(beta-1) / (c + r0^2 * t)
* exp(-psi^2 * t / (2*(c + r0^2*t)))
* e^{-t} dt
Setting r0 = 0 recovers the flat-sky King PDF exactly. Uses psi^2/2
throughout rather than 1 - cos(psi); error is below 0.5% for psi and
r0 below 5 degrees.
Parameters
----------
psi : ndarray
Angular separation(s) from the source in radians.
alpha : ndarray
King alpha parameter(s) in radians.
beta : ndarray
King beta parameter(s). Must be > 1.
extension : float
Rayleigh source extension radius in radians. Must be <= 5 degrees.
Returns
-------
ndarray
Convolved PDF values, same shape as the broadcast of inputs.
Zero wherever psi = 0.
Raises
------
NotImplementedError
If extension exceeds 5 degrees (np.radians(5)), where the
flat-sky approximation error exceeds ~0.5% and a vMF extension
should be used instead.
"""
if float(extension) > np.radians(5.0) + 1e-12:
raise NotImplementedError(
f"extension={np.degrees(float(extension)):.2f} deg exceeds the 5-degree "
"limit for the flat-sky (Rayleigh) approximation. "
"A von Mises-Fisher extension needs to be implemented "
" for larger angular scales."
)
psi, alpha, beta = np.broadcast_arrays(
np.asarray(psi, dtype=np.float64),
np.asarray(alpha, dtype=np.float64),
np.asarray(beta, dtype=np.float64),
)
r0_sq = float(extension) ** 2
c = beta * alpha**2 # InvGamma scale: beta * alpha^2
# Broadcast data arrays against the quadrature axis
t = self._quad_nodes # (n_quad,)
w = self._quad_weights # (n_quad,)
c_q = c[..., np.newaxis] # (..., 1)
psi_q = psi[..., np.newaxis] # (..., 1)
beta_q = beta[..., np.newaxis] # (..., 1)
denom = c_q + r0_sq * t # (..., n_quad); always positive
# t^(beta-1) via log to stay in float64 range for large beta or t
t_pow = np.exp((beta_q - 1.0) * np.log(t)) # (..., n_quad)
integrand = psi_q * t_pow / denom * np.exp(-(psi_q**2) * t / (2.0 * denom))
# (..., n_quad); naturally zero when psi = 0
quad_sum = (w * integrand).sum(axis=-1) # (...)
return quad_sum / np.exp(gammaln(beta - 1.0))
def _build_table(self) -> None:
"""
Precompute the convolved PDF on the (alpha, beta, extension, psi) grid.
Evaluates _scale_mixture_pdf over the full four-dimensional parameter
space by looping over extension values (keeping one (n_alpha, n_beta,
n_psi) slice in memory at a time) and stacking the results. Stores the
result as self._table with shape (n_alpha, n_beta, n_extension, n_psi)
for use by interpn at runtime.
"""
# Broadcast axes over (alpha, beta, psi) for each extension slice.
# Shape annotations assume n_alpha, n_beta, n_psi grid sizes.
alpha_g = self._points_alpha[:, np.newaxis, np.newaxis] # (n_alpha, 1, 1)
beta_g = self._points_beta[np.newaxis, :, np.newaxis] # (1, n_beta, 1)
psi_g = self._points_psi[np.newaxis, np.newaxis, :] # (1, 1, n_psi)
slices = [
self._scale_mixture_pdf(psi_g, alpha_g, beta_g, float(ext))
for ext in self._points_extension
] # each element: (n_alpha, n_beta, n_psi)
self._table = np.stack(slices, axis=2) # (n_alpha, n_beta, n_extension, n_psi)
def pdf(
self,
x: Union[float, npt.NDArray[np.floating]],
alpha: Union[float, npt.NDArray[np.floating]],
beta: Union[float, npt.NDArray[np.floating]],
extension: Union[float, npt.NDArray[np.floating]],
) -> npt.NDArray[np.floating]:
"""
Evaluate the convolved PDF at angular separation(s) x.
Parameters
----------
x : float or ndarray
Angular separation(s) from the source in radians.
alpha : float or ndarray
Per-event King alpha parameter in radians.
beta : float or ndarray
Per-event King beta parameter.
extension : float or ndarray
Source extension radius in radians.
Returns
-------
ndarray
Convolved PDF values in probability/steradian.
"""
x = np.asarray(x, dtype=np.float64)
alpha = np.asarray(alpha, dtype=np.float64)
beta = np.asarray(beta, dtype=np.float64)
extension = np.asarray(extension, dtype=np.float64)
x, alpha, beta, extension = np.broadcast_arrays(x, alpha, beta, extension)
shape = x.shape
x = np.atleast_1d(x).ravel()
alpha = np.atleast_1d(alpha).ravel()
beta = np.atleast_1d(beta).ravel()
extension = np.atleast_1d(extension).ravel()
result = np.zeros(len(x))
in_bounds = x <= self._points_psi[-1]
if np.any(in_bounds):
queries = np.column_stack(
[
np.log10(alpha[in_bounds]),
np.log10(beta[in_bounds]),
np.log10(extension[in_bounds]),
x[in_bounds],
]
)
# _table stores the 1D radial density f(ψ): ∫₀^∞ f dψ = 1.
# Per-steradian density: p(ψ) = f(ψ) / (2π ψ) [flat-sky: dΩ = 2π ψ dψ].
f_psi = interpn(
(
self._log10_points_alpha,
self._log10_points_beta,
self._log10_points_extension,
self._points_psi,
),
self._table,
queries,
method="linear",
bounds_error=True,
)
x_in = x[in_bounds]
with np.errstate(invalid="ignore", divide="ignore"):
result[in_bounds] = np.where(x_in > 0.0, f_psi / (2.0 * np.pi * x_in), 0.0)
return result.reshape(shape)
def evaluate(
self,
source_ras: npt.NDArray[np.floating],
source_decs: npt.NDArray[np.floating],
source_extensions: npt.NDArray[np.floating],
event_ras: npt.NDArray[np.floating],
event_decs: npt.NDArray[np.floating],
alpha: npt.NDArray[np.floating],
beta: npt.NDArray[np.floating],
*,
mask: Optional[csr_array] = None,
) -> csr_array:
"""
Evaluate the extended-source convolved PDF for all (event, source) pairs.
Iterates over sources, applies a declination pre-filter and a full
great-circle distance check to identify pairs within ``angular_cutoff``,
then evaluates the convolved PDF for those pairs using ``pdf()``. On
repeated calls where the source and event positions are unchanged, pass
the result of a previous call as ``mask`` to skip the masking loop and
go straight to vectorized PDF evaluation.
Parameters
----------
source_ras : ndarray, shape (n_sources,)
Source right ascensions in radians.
source_decs : ndarray, shape (n_sources,)
Source declinations in radians.
source_extensions : ndarray, shape (n_sources,)
Per-source angular extension radii in radians. Must be within the
range covered by ``points_extension`` provided at construction.
event_ras : ndarray, shape (n_events,)
Reconstructed event right ascensions in radians.
event_decs : ndarray, shape (n_events,)
Reconstructed event declinations in radians.
alpha : ndarray, shape (n_events,)
Per-event King alpha parameter in radians.
beta : ndarray, shape (n_events,)
Per-event King beta parameter.
mask : csr_array, optional
Sparse array whose nonzero structure encodes the valid
(event, source) pairs. When provided, the masking loop is skipped
and only the indexed pairs are evaluated. Pass the result of a
previous :meth:`evaluate` call to reuse the geometry.
Returns
-------
csr_array, shape (n_events, n_sources)
Sparse array of convolved PDF values in probability/steradian,
indexed ``[event_index, source_index]``.
"""
source_ras = np.atleast_1d(np.asarray(source_ras, dtype=np.float64))
source_decs = np.atleast_1d(np.asarray(source_decs, dtype=np.float64))
source_extensions = np.atleast_1d(np.asarray(source_extensions, dtype=np.float64))
event_ras = np.asarray(event_ras, dtype=np.float64)
event_decs = np.asarray(event_decs, dtype=np.float64)
alpha = np.asarray(alpha, dtype=np.float64)
beta = np.asarray(beta, dtype=np.float64)
n_events = len(event_ras)
n_sources = len(source_ras)
if mask is not None:
rows, cols = mask.nonzero()
psi = angular_distance(
source_ras[cols],
source_decs[cols],
event_ras[rows],
event_decs[rows],
)
vals = self.pdf(psi, alpha[rows], beta[rows], source_extensions[cols])
nonzero = vals > 0.0
return csr_array(
(vals[nonzero], (rows[nonzero], cols[nonzero])),
shape=(n_events, n_sources),
dtype=np.float64,
)
row_chunks, col_chunks, val_chunks = [], [], []
for source_index, (src_ra, src_dec, src_ext) in enumerate(
zip(source_ras, source_decs, source_extensions)
):
radius = min(
self.maximum_sigma * src_ext + self.angular_cutoff,
self._points_psi[-1],
)
dec_mask = np.abs(event_decs - src_dec) <= radius
candidate_indices = np.flatnonzero(dec_mask)
if len(candidate_indices) == 0:
continue
psi = angular_distance(
src_ra,
src_dec,
event_ras[candidate_indices],
event_decs[candidate_indices],
)
within_cutoff = psi <= radius
event_indices = candidate_indices[within_cutoff]
if len(event_indices) == 0:
continue
vals = self.pdf(
psi[within_cutoff],
alpha[event_indices],
beta[event_indices],
np.full(int(within_cutoff.sum()), src_ext),
)
nonzero = vals > 0.0
row_chunks.append(event_indices[nonzero])
col_chunks.append(np.full(int(nonzero.sum()), source_index, dtype=np.intp))
val_chunks.append(vals[nonzero])
if not row_chunks:
return csr_array((n_events, n_sources), dtype=np.float64)
return csr_array(
(
np.concatenate(val_chunks),
(np.concatenate(row_chunks), np.concatenate(col_chunks)),
),
shape=(n_events, n_sources),
dtype=np.float64,
)