"""Wigner small-d functions and correlation functions for spin-weighted spherical harmonics.
This module provides efficient implementations of Wigner small-d functions and their associated
correlation functions, which are fundamental for analyzing spin-weighted fields on the sphere
(e.g., CMB polarization, gravitational lensing).
The implementation uses DUCC0's `alm2leg` and `leg2alm` functions, which were not originally
optimized for Wigner transforms but achieve excellent performance through compiler optimization.
Key Concepts
------------
Wigner small-d functions :math:`d^\\ell_{s_1 s_2}(\\theta)` are the angular parts of the Wigner D-matrices,
describing the rotation of spin-weighted spherical harmonics. They appear in:
- Correlation functions of spin-weighted fields (e.g., ξ± for CMB polarization or galaxy surveys)
- Angular power spectrum estimators for CMB polarization
- Lensing reconstruction and delensing operations
- General spin transformations on the sphere
Main Functions
--------------
wignerpos : Compute Wigner correlation function from power spectrum
Forward transform: C_ℓ → ξ(θ)
wignercoeff : Compute power spectrum from Wigner correlation function
Adjoint transform: ξ(θ) → C_ℓ
wigner4pos : Compute 4 correlation functions simultaneously
Efficient for computing ξ+ and ξ- together
wignerc : Convolve two Wigner correlation functions
Used for computing products of correlation functions
wignerd : Single Wigner d-function
Returns d^ℓ_{s1,s2}(θ) for a specific ℓ
Utility Functions
-----------------
get_thgwg : Gauss-Legendre quadrature points and weights
For integration over [0, π]
get_xgwg : Gauss-Legendre quadrature over arbitrary interval [a, b]
Examples
--------
Compute CMB E-mode correlation function:
>>> import numpy as np
>>> from lenspyx.wigners import wignerpos
>>> # E-mode power spectrum
>>> cl_ee = np.loadtxt('cl_ee.txt')
>>> theta = np.linspace(0, np.pi, 100)
>>> # Compute ξ_EE(θ) = Σ_ℓ (2ℓ+1)/(4π) C_ℓ^EE d^ℓ_{2,2}(θ)
>>> xi_ee = wignerpos(cl_ee, theta, s1=2, s2=2)
Compute ξ± correlation functions:
>>> from lenspyx.wigners import wigner4pos
>>> # Returns [ξ+, ξ-] where:
>>> # ξ+ = Σ_ℓ (2ℓ+1)/(4π) C_ℓ d^ℓ_{2,+2}(θ)
>>> # ξ- = Σ_ℓ (2ℓ+1)/(4π) C_ℓ d^ℓ_{2,-2}(θ)
>>> xi_plus, xi_minus = wigner4pos(cl_ee, None, theta, s1=2, s2=2)
Invert correlation function to get power spectrum:
>>> from lenspyx.wigners import wignercoeff, get_thgwg
>>> lmax = 2000
>>> npts = (lmax + 2) // 2
>>> theta, weights = get_thgwg(npts)
>>> # Measure ξ(θ) from data
>>> xi_measured = measure_correlation(theta)
>>> # Compute C_ℓ = 2π Σ_θ ξ(θ) d^ℓ_{s1,s2}(θ)
>>> cl_recovered = wignercoeff(xi_measured, theta, s1=2, s2=2, lmax=lmax)
Convolve two correlation functions:
>>> from lenspyx.wigners import wignerc
>>> cl1 = cl_ee # E-mode spectrum
>>> cl2 = cl_bb # B-mode spectrum
>>> # Compute spectrum of ξ_EE(θ) * ξ_BB(θ)
>>> cl_product = wignerc(cl1, cl2, s1=2, t1=2, s2=2, t2=2)
Notes
-----
- Spin values s1, s2 can be positive, negative, or zero
- The normalization includes (2ℓ+1)/(4π) factors
- Gauss-Legendre quadrature ensures exact integration for band-limited functions
- Internal caching of GL points improves performance for repeated calls
References
----------
.. [1] Varshalovich, D.A., Moskalev, A.N. and Khersonskii, V.K., 1988.
Quantum theory of angular momentum. World Scientific.
.. [2] Hivon, E., et al., 2002. HEALPix: A Framework for High-Resolution
Discretization and Fast Analysis of Data Distributed on the Sphere.
ApJ, 567, 2.
.. [3] Reinecke, M. and Seljebotn, D.S., 2013. Libsharp–spherical harmonic
transforms revisited. A&A, 554, A112.
"""
from __future__ import annotations
import numpy as np
from ducc0.sht import alm2leg, leg2alm
from ducc0.misc import GL_thetas, GL_weights
GL_cache = {}
verbose = False
[docs]
def wignerpos(cl: np.ndarray[float], theta: np.ndarray[float], s1: int, s2: int):
r"""Produces Wigner small-d transform defined by
.. math::
\sum_\ell \frac{2\ell + 1}{4\pi} C_\ell d^\ell_{s_1 s_2}(\theta)
Parameters
----------
cl : array_like
Spectrum of Wigner small-d transform (power spectrum :math:`C_\ell`)
theta : array_like
Co-latitude in radians (in [0, π])
s1 : int
First spin weight
s2 : int
Second spin weight
Returns
-------
array_like
Real array of same size as theta containing the correlation function
Notes
-----
You can use :func:`wigner4pos` instead if you also need the result for -s2
(e.g., for computing :math:`\xi_{\pm}` simultaneously).
See Also
--------
wigner4pos : Compute 4 correlation functions in one go
wignercoeff : Adjoint transform (correlation function to spectrum)
"""
return wigner4pos(cl, None, theta, s1, s2)[0 if s2 >= 0 else 1]
[docs]
def wigner4pos(gl: np.ndarray[float], cl: np.ndarray[float] or None, theta: np.ndarray[float], s1: int, s2: int):
r"""Compute 4 Wigner correlation functions in one go.
Parameters
----------
gl : array_like
First spectrum (power spectrum :math:`g_\ell`)
cl : array_like or None
Second spectrum (power spectrum :math:`c_\ell`). Can be set to None if irrelevant,
and is ignored if s2 is zero.
theta : array_like
Co-latitude in radians (in [0, π])
s1 : int
First spin weight
s2 : int
Second spin weight
Returns
-------
array_like
In the most general case, an array of shape (ncomp, ntheta) with:
- Component 0: :math:`\sum_\ell g_\ell \frac{2\ell + 1}{4\pi} d^\ell_{s_1, |s_2|}(\theta)`
- Component 1: :math:`\sum_\ell g_\ell \frac{2\ell + 1}{4\pi} d^\ell_{s_1,-|s_2|}(\theta)`
- Component 2: :math:`\sum_\ell c_\ell \frac{2\ell + 1}{4\pi} d^\ell_{s_1, |s_2|}(\theta)` (if cl is not None)
- Component 3: :math:`\sum_\ell c_\ell \frac{2\ell + 1}{4\pi} d^\ell_{s_1,-|s_2|}(\theta)` (if cl is not None)
The number of components ncomp in the output is:
- 4 if (s2 ≠ 0 and cl is not None)
- 2 if (s2 ≠ 0 and cl is None)
- 1 if s2 = 0
Notes
-----
This function is more efficient than calling :func:`wignerpos` multiple times when
you need correlation functions for both +s2 and -s2 (e.g., ξ+ and ξ-).
See Also
--------
wignerpos : Compute single correlation function
"""
standard = cl is not None
if standard:
lmax = (max(len(cl), len(gl)) if s2 else len(cl)) - 1
mode = 'STANDARD'
nout = 4 if s2 else 1
else:
lmax = len(gl) - 1
mode = 'GRAD_ONLY' if s2 else 'STANDARD'
nout = 2 if s2 else 1
if s1 == 0 and s2: # Always prefer a faster spin-0 sht
sgn_s = 1 if s2 % 2 == 0 else -1
wig_g = wigner4pos(gl, None, theta, abs(s2), 0)[0]
if standard:
wig_c = wigner4pos(cl, None, theta, abs(s2), 0)[0]
return np.stack([wig_g * sgn_s, wig_g, sgn_s * wig_c, wig_c])
return np.stack([wig_g * sgn_s, wig_g])
s1_pos = s1 >= 0
sgn_s1 = 1 if s1_pos else (1 if (s1 + s2) % 2 == 0 else -1)
ncomp = 1 + (s2 != 0) * standard
gclm = np.empty((ncomp, lmax + 1), dtype=complex)
prefac = np.sqrt(np.arange(1, 2 * lmax + 3, 2)) * (sgn_s1 / np.sqrt(4 * np.pi))
gclm[0, :len(gl)] = prefac * gl[:len(gl)]
if s2 and standard:
gclm[1, :len(cl)] = prefac * cl[:len(cl)]
leg = alm2leg(alm=gclm, spin=abs(s2), lmax=lmax, mval=np.array([abs(s1)], dtype=int),
mstart=np.array([0]), theta=theta, mode=mode).squeeze()
wig = np.zeros((nout, theta.size), float)
if s2:
s_sgn = (1 if s2 % 2 == 0 else -1)
wig[0 if s1_pos else 1] = -(leg[0].real + leg[1].imag)
wig[1 if s1_pos else 0] = -s_sgn * (leg[0].real - leg[1].imag)
if standard:
wig[2 if s1_pos else 3] = -(leg[1].real - leg[0].imag)
wig[3 if s1_pos else 2] = -s_sgn * (leg[1].real + leg[0].imag)
return wig
else:
wig[0] = leg.real
return wig
[docs]
def wignerd(l: int, s1: int, s2: int, theta: np.ndarray):
r"""Returns Wigner small-d functions for a specific multipole.
Computes the normalized Wigner d-functions:
.. math::
\frac{2\ell + 1}{4\pi} d^\ell_{s_1, |s_2|}(\theta)
and
.. math::
\frac{2\ell + 1}{4\pi} d^\ell_{s_1,-|s_2|}(\theta)
for all θ. If s2 is zero, only the first component is returned.
Parameters
----------
l : int
Multipole moment
s1 : int
First spin weight
s2 : int
Second spin weight
theta : array_like
Co-latitude angles in radians
Returns
-------
array_like
Array of shape (2, ntheta) if s2 ≠ 0, or (1, ntheta) if s2 = 0
See Also
--------
wignerdl : Returns d^l_{s1,s2}(θ) without normalization for all ℓ
"""
gl = np.zeros(l + 1)
gl[-1] = 1.
return wigner4pos(gl, None, theta, s1, s2)
[docs]
def wignercoeff(xi: np.ndarray[float], theta: np.ndarray[float], s1: int, s2: int, lmax: int):
r"""Computes spectrum of Wigner small-d correlation function (adjoint to wignerpos).
This is the adjoint transform that converts a correlation function to its
power spectrum:
.. math::
C_\ell = 2\pi \sum_\theta \xi(\theta) d^\ell_{s_1 s_2}(\theta)
Parameters
----------
xi : array_like
Wigner correlation function (real array, one value per co-latitude)
theta : array_like
Co-latitude in radians (in [0, π])
s1 : int
First spin weight
s2 : int
Second spin weight
lmax : int
Maximum multipole (inclusive). Spectrum is calculated from 0 to lmax.
Returns
-------
array_like
Power spectrum :math:`C_\ell` from ℓ=0 to ℓ=lmax
Notes
-----
This function implements the adjoint operation to :func:`wignerpos`. It is used
to estimate power spectra from measured correlation functions.
See Also
--------
wignerpos : Forward transform (spectrum to correlation function)
"""
if s1 < 0:
t_xi = xi if (s1 + s2) % 2 == 0 else -xi
return wignercoeff(t_xi, theta, -s1, -s2, lmax)
if s1 == 0 and s2 != 0:
# always want a spin 0 on the SHT side
t_xi = xi if (s1 + s2) % 2 == 0 else -xi
return wignercoeff(t_xi, theta, s2, s1, lmax)
mval = np.array([abs(s1)], dtype=int)
mstart = np.array([0], dtype=int)
fac = (2 * np.pi * np.sqrt(4 * np.pi)) / np.sqrt(np.arange(1, 2 * lmax + 3, 2))
xis = xi.astype(complex)
lmin = max(abs(s2), abs(s1))
if s2 == 0:
cl = leg2alm(leg=np.reshape(xis, (1, xi.size, 1)), spin=0, mval=mval, mstart=mstart, theta=theta,
lmax=lmax, mode='STANDARD').squeeze().real
sgn = 1 if s1 > 0 else (1 if abs(s1) % 2 == 0 else -1)
cl[:lmin] = 0.
return sgn * cl * fac
else:
xis = np.stack([xis, (1j * np.sign(s2)) * xis]).reshape((2, xi.size, 1))
cl = leg2alm(leg=xis, spin=abs(s2), mval=mval, mstart=mstart, theta=theta, lmax=lmax,
mode='GRAD_ONLY').squeeze().real
sgn = -1 if s2 > 0 else (-1 if abs(s2) % 2 == 0 else 1)
cl[:lmin] = 0.
return sgn * cl * fac
[docs]
def wignerc(cl1: np.ndarray[float or complex], cl2:np.ndarray[float or complex], s1: int, t1: int, s2: int, t2: int,
lmax_out: int = -1):
r"""Convolution of two Wigner small-d correlation functions.
Computes the power spectrum of the product of two correlation functions:
.. math::
\xi_{s_1 t_1}(\mu) \times \xi_{s_2 t_2}(\mu)
where
.. math::
\xi_{s_1 t_1}(\mu) = \sum_{\ell} C_{1,\ell} \frac{2\ell + 1}{4\pi} d^\ell_{s_1 t_1}(\mu)
.. math::
\xi_{s_2 t_2}(\mu) = \sum_{\ell} C_{2,\ell} \frac{2\ell + 1}{4\pi} d^\ell_{s_2 t_2}(\mu)
Gauss-Legendre quadrature is used to solve this exactly (up to numerical precision).
Parameters
----------
cl1 : array_like
Spectrum of first Wigner small-d function (:math:`C_{1,\ell}`)
cl2 : array_like
Spectrum of second Wigner small-d function (:math:`C_{2,\ell}`)
s1 : int
First spin of first function
t1 : int
Second spin of first function
s2 : int
First spin of second function
t2 : int
Second spin of second function
lmax_out : int, optional
Maximum multipole of output spectrum. Defaults to len(cl1) + len(cl2) - 2.
Returns
-------
array_like
Power spectrum of the product correlation function, from ℓ=0 to ℓ=lmax_out
Notes
-----
This function is useful for computing non-Gaussian covariances and higher-order statistics.
The output has spins (s1+s2, t1+t2).
"""
lmax1 = len(cl1) - 1
lmax2 = len(cl2) - 1
lmax_out = lmax1 + lmax2 if lmax_out < 0 else lmax_out
lmax_tot = lmax1 + lmax2 + lmax_out
so = s1 + s2
to = t1 + t2
if np.any(cl1) and np.any(cl2):
npts = (lmax_tot + 2 - lmax_tot % 2) // 2
if not 'tht wg %s' % npts in GL_cache.keys():
GL_cache['tht wg %s' % npts] = get_thgwg(npts)
tht, wg = GL_cache['tht wg %s' % npts]
if np.iscomplexobj(cl1):
xi1 = wignerpos(np.real(cl1), tht, s1, t1) + 1j * wignerpos(np.imag(cl1), tht, s1, t1)
else:
xi1 = wignerpos(cl1, tht, s1, t1)
if np.iscomplexobj(cl2):
xi2 = wignerpos(np.real(cl2), tht, s2, t2) + 1j * wignerpos(np.imag(cl2), tht, s2, t2)
else:
xi2 = wignerpos(cl2, tht, s2, t2)
xi1xi2w = xi1 * xi2 * wg
if np.iscomplexobj(xi1xi2w):
ret = wignercoeff(np.real(xi1xi2w), tht, so, to, lmax_out)
ret = ret + 1j * wignercoeff(np.imag(xi1xi2w), tht, so, to, lmax_out)
return ret
else:
return wignercoeff(xi1xi2w, tht, so, to, lmax_out)
else:
return np.zeros(lmax_out + 1, dtype=float)
[docs]
def wignerdl(s1: int, s2: int, theta: float, lmax: int):
r"""Returns the Wigner d-function for all multipoles.
Computes the Wigner d-function:
.. math::
d^\ell_{s_1 s_2}(\theta)
for all ℓ from 0 to lmax.
Parameters
----------
s1 : int
First spin weight
s2 : int
Second spin weight
theta : float
Co-latitude angle in radians (scalar)
lmax : int
Maximum multipole moment
Returns
-------
array_like
Array of size lmax+1 containing :math:`d^\ell_{s_1 s_2}(\theta)` for ℓ=0 to lmax
Notes
-----
This returns the unnormalized Wigner d-function, without the (2ℓ+1)/(4π) factor.
For the normalized version, use :func:`wignerd`.
See Also
--------
wignerd : Normalized Wigner d-function for a specific ℓ
"""
assert np.isscalar(theta), 'scalar theta input here'
return wignercoeff(np.array([1.]), np.array([theta]), s1, s2, lmax) / (2 * np.pi)
[docs]
def get_thgwg(npts: int):
"""Gauss-Legendre integration points and weights from DUCC0.
Provides quadrature points and weights for integration over the interval [0, π],
optimized for integrating functions on the sphere.
Parameters
----------
npts : int
Number of quadrature points
Returns
-------
tht : array_like
Co-latitude points in radians (array of size npts)
wg : array_like
Quadrature weights (array of size npts)
Notes
-----
This uses DUCC0's highly optimized Gauss-Legendre quadrature implementation.
For a band-limited function up to ℓ_max, use npts ≥ (ℓ_max + 2) / 2 for
exact integration.
Examples
--------
>>> from lenspyx.wigners import get_thgwg
>>> theta, weights = get_thgwg(100)
>>> # Integrate a function f(θ) over the sphere:
>>> integral = np.sum(f(theta) * weights)
"""
tht = GL_thetas(npts)
wg = GL_weights(npts, 1) / (2 * np.pi)
return tht, wg
[docs]
def get_xgwg(a: float, b: float, npts: int):
"""Gauss-Legendre points and weights for integration over an arbitrary interval.
Provides quadrature points and weights for integration over the interval [a, b].
Parameters
----------
a : float
Lower bound of integration interval
b : float
Upper bound of integration interval
npts : int
Number of quadrature points
Returns
-------
xg : array_like
Quadrature points within (a, b) (array of size npts)
wg : array_like
Quadrature weights (array of size npts)
Notes
-----
This function transforms the standard Gauss-Legendre quadrature on [-1, 1]
to an arbitrary interval [a, b].
Examples
--------
>>> from lenspyx.wigners import get_xgwg
>>> x, weights = get_xgwg(0, 10, 50)
>>> # Integrate a function f(x) over [0, 10]:
>>> integral = np.sum(f(x) * weights)
"""
tht = GL_thetas(npts)
wg = GL_weights(npts, 1) / (2 * np.pi)
c = 0.5 * (a + b)
d = 0.5 * (b - a)
return (c + np.cos(tht) * d)[::-1], (wg * d)[::-1]