from typing import Any, Dict, List, Optional, Tuple, Union, cast
from tqdm import tqdm
import numpy as np
import numpy.typing as npt
from scipy.optimize import minimize
from .distribution import _cdf_and_gradient
from .pdf import KingPDF
from .utils import angular_distance
[docs]
class KingPSFFitter:
"""
Fit King PSF parameters to simulated signal events in arbitrary dimensions.
This class bins signal Monte Carlo events along user-specified observables
and fits King distribution parameters (alpha, beta) to the angular error
distribution in each bin.
Parameters
----------
signal_events : structured array
Numpy structured array containing signal MC events. Must include:
- 'ra', 'dec': reconstructed coordinates (radians)
- 'true_ra', 'true_dec': true coordinates (radians)
Additional fields can be used for parameterization binning.
parametrization_bins : dict
Dictionary mapping observable names to bin edges or number of bins.
Keys must correspond to fields in signal_events.
Values can be:
- int: number of equal-probability bins
- array-like: explicit bin edges
dpsi_nbins : int, optional
Number of bins in angular error (dpsi) for fitting. Default is 101.
minimum_counts : int, optional
Minimum number of events required in a bin for fitting. Default is 100.
remove_weight_outliers : bool
If True, remove events with weights beyond the bounds of weight_outlier_percentiles
(calculated per parametrization bin) from the fit. Used to stabilize fits by removing
weighting outliers. Default is True.
weight_outlier_percentiles : list, optional
The percentiles (ranging from 0-100) defining the range of weights to accept
for the per-parametrization bin histogramming and fitting. Note that these are
applied based on the weights based on sorted index value and not cumulative
weight value like np.percentile. Default is [0, 95],
weight_field : str, optional
Field name for oneweight. If None, equal weights are used.
true_ra_name : str
Field name for the true value of the signal events' right ascension.
true_dec_name : str
Field name for the true value of the signal events' declination.
true_energy_name : str
Field name for the true value of the signal events' energy.
spectral_indices : array-like, optional
Spectral indices (gamma) for reweighting. Default is [2.0].
angular_cutoff : float, optional
Maximum angular separation for King PDF. Default is pi.
Attributes
----------
fit_alpha : ndarray
Fitted alpha parameters for each bin.
fit_beta : ndarray
Fitted beta parameters for each bin.
histograms : ndarray
Histogram values for each bin.
uncertainties : ndarray
Histogram uncertainties for each bin.
dpsi_bins : ndarray
Angular error bin edges for each bin.
fit_quality : ndarray
Chi-square values indicating fit quality.
"""
def __init__(
self,
signal_events: npt.NDArray[Any],
parametrization_bins: Dict[str, Union[int, List, Tuple, npt.NDArray]],
dpsi_nbins: int = 101,
minimum_counts: int = 100,
remove_weight_outliers=True,
weight_outlier_percentiles=[0, 95],
weight_field: Optional[str] = "ow",
true_ra_name: str = "trueRa",
true_dec_name: str = "trueDec",
true_energy_name: str = "trueE",
spectral_indices: Optional[Union[List[float], npt.NDArray[np.floating]]] = None,
angular_cutoff: float = np.pi,
) -> None:
"""Initialize the KingPSFFitter."""
self.signal_events = signal_events
self.remove_weight_outliers = remove_weight_outliers
self.weight_outlier_percentiles = weight_outlier_percentiles
self.weight_field = weight_field
self.true_ra_name = true_ra_name
self.true_dec_name = true_dec_name
self.true_energy_name = true_energy_name
self.dpsi_nbins = dpsi_nbins
self.minimum_counts = minimum_counts
self.spectral_indices = (
np.atleast_1d(spectral_indices) if spectral_indices is not None else np.array([2.0])
)
self.angular_cutoff = angular_cutoff
# Initialize King PDF
self.king_pdf = KingPDF(angular_cutoff=angular_cutoff)
# Validate and setup binning
self._validate_fields(parametrization_bins)
self.parametrization_bins = self._setup_bins(parametrization_bins)
self.bin_names = list(self.parametrization_bins.keys())
self.parametrization_shape = [len(bins) - 1 for bins in self.parametrization_bins.values()]
# Calculate angular distances
self.dpsi = angular_distance(
self.signal_events["ra"],
self.signal_events["dec"],
self.signal_events[self.true_ra_name],
self.signal_events[self.true_dec_name],
)
# Bin events
self.event_indices = self._bin_events()
# Pre-group events by flat bin index for O(1) per-bin lookup in fit_all_bins.
# Replaces the per-iteration boolean-mask construction over all events.
_shape = tuple(self.parametrization_shape)
_idx_arrays = [
np.clip(self.event_indices[k], 1, s) - 1 # 0-based, clipped in-range
for k, s in zip(self.bin_names, _shape)
]
_flat = np.ravel_multi_index(_idx_arrays, _shape)
self._event_sort_order = np.argsort(_flat, kind="stable")
_sorted_flat = _flat[self._event_sort_order]
_n_bins = int(np.prod(_shape))
self._bin_boundaries = np.searchsorted(_sorted_flat, np.arange(_n_bins + 1))
# Initialize storage arrays
self._initialize_storage()
def _validate_fields(
self, parametrization_bins: Dict[str, Union[int, List, Tuple, npt.NDArray]]
) -> None:
"""
Validate that required and parameterization fields exist in signal events.
Parameters
----------
parametrization_bins : dict
Dictionary of binning specifications.
Raises
------
ValueError
If required fields are missing.
"""
required_fields = ["ra", "dec", self.true_ra_name, self.true_dec_name]
names = self.signal_events.dtype.names or ()
missing_required = [f for f in required_fields if f not in names]
if missing_required:
raise ValueError(f"Signal events missing required fields: {missing_required}")
missing_params = [key for key in parametrization_bins.keys() if key not in names]
if missing_params:
raise ValueError(
f"Parametrization fields {missing_params} not found in signal events. "
f"Available fields: {names}"
)
if self.weight_field is not None and (self.weight_field not in names):
raise ValueError(f"Weight field '{self.weight_field}' not found in signal events.")
def _setup_bins(
self, parametrization_bins: Dict[str, Union[int, List, Tuple, npt.NDArray]]
) -> Dict[str, npt.NDArray[np.floating]]:
"""
Convert binning specifications to explicit bin edges.
Parameters
----------
parametrization_bins : dict
Dictionary mapping field names to bin specs (int or array).
Returns
-------
dict
Dictionary mapping field names to bin edge arrays.
"""
bins_dict = {}
for key, val in parametrization_bins.items():
if isinstance(val, int):
# Create equal-probability bins
bins_dict[key] = self._get_percentile_bins(val, self.signal_events[key])
elif isinstance(val, (tuple, list, np.ndarray)):
bins_dict[key] = np.asarray(val)
else:
raise ValueError(
f"Unknown binning specification for '{key}': {val}. "
"Use int for number of bins or array-like for bin edges."
)
return bins_dict
def _get_percentile_bins(
self,
nbins: int,
values: npt.NDArray[np.floating],
weights: Optional[npt.NDArray[np.floating]] = None,
) -> npt.NDArray[np.floating]:
"""
Create bins with approximately equal number of (weighted) events.
Parameters
----------
nbins : int
Number of bins to create.
values : ndarray
Values to bin.
weights : ndarray, optional
Event weights. If None, equal weights used.
Returns
-------
ndarray
Bin edges.
"""
if weights is None:
weights = np.ones(len(values))
# Sort and create cumulative distribution
sorted_idx = np.argsort(values)
cumulative = np.cumsum(weights[sorted_idx]) / weights.sum()
# Find bin edges at equal probability intervals
percentiles = np.linspace(0, 1, nbins + 1)
positions = np.searchsorted(cumulative, percentiles)
positions = np.clip(positions, 0, len(values) - 1)
# Handle duplicates by using unique values
positions = np.unique(positions)
bin_edges = values[sorted_idx][positions]
# Ensure we have at least 2 edges (1 bin)
if len(bin_edges) < 2:
return np.array([values.min(), values.max()])
return bin_edges
def _bin_events(self) -> Dict[str, npt.NDArray[np.integer]]:
"""
Assign each event to a bin index for each parameterization dimension.
Returns
-------
dict
Dictionary mapping field names to bin indices for each event.
"""
event_indices = {}
for key, bins in self.parametrization_bins.items():
event_indices[key] = np.digitize(self.signal_events[key], bins)
return event_indices
def _initialize_storage(self) -> None:
"""Initialize arrays to store fit results and diagnostics."""
shape_with_gamma = [len(self.spectral_indices)] + self.parametrization_shape
# Fit parameters
self.fit_alpha = np.full(shape_with_gamma, np.median(self.dpsi))
self.fit_beta = np.full(shape_with_gamma, 2.25)
# Diagnostics
self.histograms = np.zeros(shape_with_gamma + [self.dpsi_nbins], dtype=float)
self.uncertainties = np.zeros(shape_with_gamma + [self.dpsi_nbins], dtype=float)
self.dpsi_bins = np.zeros(shape_with_gamma + [self.dpsi_nbins + 1], dtype=float)
self.fit_quality = np.zeros(shape_with_gamma, dtype=float)
self.event_counts = np.zeros(shape_with_gamma, dtype=int)
[docs]
def fit_all_bins(self, verbose: bool = True) -> Dict[str, npt.NDArray]:
"""
Fit King PSF parameters in all bins.
Iterates over all bins defined by parametrization_bins and spectral_indices,
fitting King distribution parameters to the angular error distribution.
Parameters
----------
verbose : bool, optional
Print progress information. Default is True.
Returns
-------
dict
Dictionary containing fit results with keys:
- 'alpha': fit_alpha array
- 'beta': fit_beta array
- 'histograms': histogram values
- 'uncertainties': histogram uncertainties
- 'dpsi_bins': angular error bin edges
- 'fit_quality': chi-square values
- 'event_counts': number of events per bin
"""
if verbose:
print(f"Fitting King PSF in {np.prod(self.parametrization_shape)} bins...")
print(f" Spectral indices: {self.spectral_indices}")
print(f" Binning dimensions: {self.bin_names}")
# Iterate over spectral indices
for g_idx, gamma in enumerate(self.spectral_indices):
if verbose:
print(f"\n Spectral index γ = {gamma:.2f}")
# Calculate event weights
if self.weight_field is not None:
weights = self.signal_events[self.weight_field] * self.signal_events[
self.true_energy_name
] ** (-gamma)
else:
weights = np.ones(len(self.signal_events))
# Iterate over all bin combinations
n_fitted = 0
n_skipped = 0
total_bins = np.prod(self.parametrization_shape)
for bin_indices in tqdm(np.ndindex(*self.parametrization_shape), total=total_bins):
flat_idx = int(np.ravel_multi_index(bin_indices, tuple(self.parametrization_shape)))
event_idx = self._event_sort_order[
self._bin_boundaries[flat_idx] : self._bin_boundaries[flat_idx + 1]
]
if self.remove_weight_outliers and len(event_idx) > 0:
bin_weights = weights[event_idx]
idx_range = [
int(len(bin_weights) * self.weight_outlier_percentiles[0] / 100),
int(len(bin_weights) * self.weight_outlier_percentiles[1] / 100),
]
idx = np.digitize(bin_weights, np.unique(bin_weights))
event_idx = event_idx[(idx_range[0] <= idx) & (idx <= idx_range[1])]
n_events = len(event_idx)
param_idx = tuple([g_idx] + list(bin_indices))
self.event_counts[param_idx] = n_events
# Skip if insufficient events
if n_events < self.minimum_counts:
n_skipped += 1
continue
# Fit this bin
success = self._fit_single_bin(event_idx, weights, param_idx)
if success:
n_fitted += 1
else:
n_skipped += 1
if verbose:
print(f" Fitted {n_fitted} bins, skipped {n_skipped} bins")
if verbose:
print("\nFitting complete!")
return {
"alpha": self.fit_alpha,
"beta": self.fit_beta,
"histograms": self.histograms,
"uncertainties": self.uncertainties,
"dpsi_bins": self.dpsi_bins,
"fit_quality": self.fit_quality,
"event_counts": self.event_counts,
"parametrization_bins": self.parametrization_bins, # type: ignore[dict-item]
}
def _cdf_chi2(self, cdf_hist, cdf_variance, bins, alpha, beta):
try:
cdf, grad_alpha, grad_beta = _cdf_and_gradient(
bins[1:], alpha, beta, self.angular_cutoff
)
except ZeroDivisionError:
return 100000.0, np.zeros(2)
scale = cdf_hist[-1] / cdf[-1]
expected = scale * cdf
residuals = cdf_hist - expected
inv_variance = 1.0 / np.nextafter(cdf_variance, np.inf)
n_bins = len(bins)
val = np.sum(residuals**2 * inv_variance) / n_bins
if not np.isfinite(val):
return 100000.0, np.zeros(2)
# d(chi2)/dtheta = (-2*scale/n_bins) * sum(r/var * (dCDF - (cdf/cdf_last)*dCDF_last))
weighted_residuals = residuals * inv_variance
cdf_ratio = cdf / cdf[-1]
grad = (-2.0 * scale / n_bins) * np.array(
[
np.sum(weighted_residuals * (grad_alpha - cdf_ratio * grad_alpha[-1])),
np.sum(weighted_residuals * (grad_beta - cdf_ratio * grad_beta[-1])),
]
)
return val, grad
def _fit_single_bin(
self,
event_idx: npt.NDArray[np.intp],
weights: npt.NDArray[np.floating],
param_idx: Tuple[int, ...],
) -> bool:
"""
Fit King parameters for a single bin.
Parameters
----------
event_idx : ndarray
Integer indices of events in this bin.
weights : ndarray
Event weights.
param_idx : tuple
Index tuple for storing results.
Returns
-------
bool
True if fit succeeded, False otherwise.
"""
# Extract events in this bin
masked_dpsi = self.dpsi[event_idx]
masked_weights = weights[event_idx]
masked_weights /= masked_weights.sum() # Normalize
# Create bins for this subset. Also calculate the
# phase space parameter while we're here. We'll need
# it in order to store the histograms as PDFs later.
dpsi_bins = self._get_percentile_bins(self.dpsi_nbins, masked_dpsi, masked_weights)
dpsi_bins = np.unique(dpsi_bins)
delta = -2 * np.pi * np.diff(np.cos(dpsi_bins))
self.dpsi_bins[param_idx][: len(dpsi_bins)] = dpsi_bins
# If we don't have enough bins, skip
if len(dpsi_bins) < 3:
return False
bin_centers = (dpsi_bins[:-1] + dpsi_bins[1:]) / 2
# Create weighted histogram
hist, _ = np.histogram(masked_dpsi, bins=dpsi_bins, weights=masked_weights)
hist2, _ = np.histogram(masked_dpsi, bins=dpsi_bins, weights=masked_weights**2)
cdf_hist = np.cumsum(hist)
cdf_variance = np.cumsum(hist2) / np.sum(hist) ** 2
# Get initial guess from peak location.
alpha_guess = bin_centers[np.searchsorted(cdf_hist, 0.5)]
best_params = None
best_chi2 = np.inf
for beta in [1.25, 1.75, 2, 2.5, 4, 7, 9]:
result = minimize(
lambda params: self._cdf_chi2(cdf_hist, cdf_variance, dpsi_bins, *params),
[alpha_guess, beta],
method="L-BFGS-B",
jac=True,
bounds=[
(np.nextafter(1e-4, np.pi), np.nextafter(self.angular_cutoff, 0)),
(np.nextafter(1, 2), np.nextafter(1000, 1)),
],
)
if result.success and (best_chi2 > result.fun):
best_params, best_chi2 = result.x, result.fun
# Store histogram data (pad/truncate to match storage size).
# Make sure to rescale by the phase space to get densities.
n_store = min(len(hist), self.dpsi_nbins)
self.histograms[param_idx][:n_store] = hist[:n_store] / delta
self.uncertainties[param_idx][:n_store] = np.sqrt(hist2[:n_store]) / delta
self.dpsi_bins[param_idx][: len(dpsi_bins)] = dpsi_bins
# Store results if we found a solution
if best_params is not None:
self.fit_alpha[param_idx] = best_params[0]
self.fit_beta[param_idx] = best_params[1]
self.fit_quality[param_idx] = best_chi2
return True
return False
[docs]
def get_interpolator(self, gamma_index: int = 0) -> Tuple[Any, Any]:
"""
Get an interpolator for fitted parameters at a given spectral index.
Parameters
----------
gamma_index : int, optional
Index of the spectral index to use. Default is 0.
Returns
-------
tuple
(alpha_interpolator, beta_interpolator) functions that interpolate
fitted parameters based on parameterization bin values.
Notes
-----
Requires scipy.interpolate.RegularGridInterpolator (not imported by default
to avoid dependency). This method will raise ImportError if scipy is not available.
"""
from scipy.interpolate import RegularGridInterpolator
# Get bin centers for each dimension
bin_centers = []
for key in self.bin_names:
bins = self.parametrization_bins[key]
centers = (bins[:-1] + bins[1:]) / 2
bin_centers.append(centers)
# Create interpolators
alpha_interp = RegularGridInterpolator(
tuple(bin_centers),
self.fit_alpha[gamma_index],
bounds_error=False,
fill_value=None, # Extrapolate
)
beta_interp = RegularGridInterpolator(
tuple(bin_centers),
self.fit_beta[gamma_index],
bounds_error=False,
fill_value=None, # Extrapolate
)
return alpha_interp, beta_interp
[docs]
def plot_fit(
self,
bin_indices: Union[Tuple[int, ...], Dict[str, int]],
gamma_index: int = 0,
ax: Optional[Any] = None,
) -> Any:
"""
Plot the fitted King PDF for a specific bin.
Parameters
----------
bin_indices : tuple or dict
Indices of the bin to plot. Can be tuple of integers or dict
mapping bin names to indices.
gamma_index : int, optional
Index of spectral index. Default is 0.
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, creates new figure.
Returns
-------
matplotlib.axes.Axes
The axes object.
Notes
-----
Requires matplotlib (not imported by default). This method will raise
ImportError if matplotlib is not available.
"""
import matplotlib.pyplot as plt
if ax is None:
fig, ax = plt.subplots(figsize=(8, 6))
# Convert dict to tuple if needed
if isinstance(bin_indices, dict):
bin_indices = tuple(bin_indices[key] for key in self.bin_names)
param_idx = tuple([gamma_index] + list(bin_indices))
# Get histogram data
hist = self.histograms[param_idx]
uncertainty = self.uncertainties[param_idx]
bins = self.dpsi_bins[param_idx]
# Only plot non-zero bins
mask = hist > 0
bin_centers = (bins[:-1] + bins[1:])[mask] / 2
# Plot histogram
ax.errorbar(
np.degrees(bin_centers),
hist[mask],
yerr=uncertainty[mask],
fmt="o",
label="MC Events",
color="black",
markersize=4,
)
# Plot fitted King PDF
alpha = self.fit_alpha[param_idx]
beta = self.fit_beta[param_idx]
dpsi_fine = np.linspace(0, min(8 * alpha, np.pi), 1000)
pdf_fit = cast(npt.NDArray[np.floating], self.king_pdf.pdf(dpsi_fine, alpha, beta))
pdf_fit *= hist[mask].max() / pdf_fit.max() # Normalize for visualization
ax.plot(np.degrees(dpsi_fine), pdf_fit, "-", linewidth=2, label="King Fit", color="blue")
# Add labels
ax.set_xlabel("Angular Error (degrees)")
ax.set_ylabel("Normalized Density")
ax.set_yscale("log")
ax.grid(alpha=0.3)
ax.legend()
# Add fit parameters to plot
title = f"γ={self.spectral_indices[gamma_index]:.2f}, "
title += f"α={np.degrees(alpha):.3f}°, β={beta:.2f}\n"
for i, key in enumerate(self.bin_names):
bin_idx = bin_indices[i]
bins = self.parametrization_bins[key]
title += f"{key}=[{bins[bin_idx]:.2e}, {bins[bin_idx + 1]:.2e}] "
ax.set_title(title, fontsize=10)
return ax