# Copyright (c) 2003-2024 by Mike Jarvis
#
# TreeCorr is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions, and the disclaimer given in the accompanying LICENSE
# file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the disclaimer given in the documentation
# and/or other materials provided with the distribution.
"""
.. module:: nnkcorrelation
"""
import numpy as np
from . import _treecorr
from .catalog import calculateVarK
from .corr3base import Corr3
[docs]class NKKCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point count-scalar-scalar correlation
function, where as usual K represents any spin-0 scalar field.
With this class, point 1 of the triangle (i.e. the vertex opposite d1) is the one with the
scalar value. Use `KNKCorrelation` and `KKNCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> nkk = treecorr.NKKCorrelation(config)
>>> nkk.process(cat1, cat2) # Compute cross-correlation of two fields.
>>> nkk.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> nkk.write(file_name) # Write out to a file.
>>> rkk.process(rand, cat2) # Compute cross-correlation with randoms.
>>> nkk.calculateZeta(rkk=rkk) # Calculate zeta using randoms
>>> zeta = nkk.zeta # Access correlation function
>>> zetar = nkk.zetar # Or access real and imaginary parts separately
>>> zetai = nkk.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'NKKCorrelation'
_letter1 = 'N'
_letter2 = 'K'
_letter3 = 'K'
_letters = 'NKK'
_builder = _treecorr.NKKCorr
_calculateVar1 = lambda *args, **kwargs: None
_calculateVar2 = staticmethod(calculateVarK)
_calculateVar3 = staticmethod(calculateVarK)
_sig1 = None
_sig2 = 'sig_k'
_sig3 = 'sig_k'
_default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._rkk = None
shape = self.data_shape
self._z[0] = np.zeros(shape, dtype=float)
if self.bin_type == 'LogMultipole':
self._z[1] = np.zeros(shape, dtype=float)
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building NKKCorr')
@property
def raw_zeta(self):
return self._z[0]
@property
def zeta(self):
if self._zeta is None:
if self._z[1].size:
return self._z[0] + 1j * self._z[1]
else:
return self._z[0]
else:
return self._zeta
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._rkk is not None and self._rkk is not True:
ret._rkk = self._rkk.copy()
return ret
[docs] def finalize(self, vark1, vark2):
"""Finalize the calculation of the correlation function.
Parameters:
vark1 (float): The variance of the first scalar field.
vark2 (float): The variance of the second scalar field.
"""
self._finalize()
self._var_num = vark1 * vark2
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._rkk = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, rkk=None):
r"""Calculate the correlation function possibly given another correlation function
that uses random points for the foreground objects.
- If rkk is None, the simple correlation function (self.zeta) is returned.
- If rkk is not None, then a compensated calculation is done:
:math:`\zeta = (DKK - RKK)`, where DKK represents the correlation of the kappa
field with the data points and RKK represents the correlation with random points.
After calling this function, the attributes ``zeta``, ``varzeta`` and ``cov`` will
correspond to the compensated values (if rkk is provided). The raw, uncompensated values
are available as ``rawxi`` and ``raw_varxi``.
Parameters:
rkk (NKKCorrelation): The cross-correlation using random locations as the lenses (RKK),
if desired. (default: None)
Returns:
Tuple containing
- zeta = array of :math:`\zeta`
- varzeta = array of variance estimates of :math:`\zeta`
"""
if rkk is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
self._zeta = self.raw_zeta - rkk.zeta
self._rkk = rkk
if (rkk.npatch1 not in (1,self.npatch1) or rkk.npatch2 != self.npatch2
or rkk.npatch3 != self.npatch3):
raise RuntimeError("RKK must be run with the same patches as DKK")
if len(self.results) > 0:
# If there are any rkk patch pairs that aren't in results (e.g. due to different
# edge effects among the various pairs in consideration), then we need to add
# some dummy results to make sure all the right pairs are computed when we make
# the vectors for the covariance matrix.
template = next(iter(self.results.values())) # Just need something to copy.
for ijk in rkk.results:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + rkk.varzeta
else:
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._rkk is not None:
# If rkk has npatch1 = 1, adjust pairs appropriately
if self._rkk.npatch1 == 1 and not all([p[0] == 0 for p in pairs]):
pairs = [(0,j,k,w) for i,j,k,w in pairs if i == j]
pairs = self._rkk._keep_ok(pairs)
self._rkk._calculate_xi_from_pairs(pairs, corr_only=True)
self._zeta = self.raw_zeta - self._rkk.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zeta The estimator of :math:`\zeta` (For LogMultipole, this is split
into real and imaginary parts, zetar and zetai.)
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
if self.bin_type == 'LogMultipole':
return ['zetar', 'zetai', 'sigma_zeta']
else:
return ['zeta', 'sigma_zeta']
@property
def _write_class_data(self):
if self.bin_type == 'LogMultipole':
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
else:
return [self.zeta, np.sqrt(self.varzeta)]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
if self.bin_type == 'LogMultipole':
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
else:
self._z[0] = data['zeta'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]
[docs]class KNKCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point scalar-count-scalar correlation
function, where as usual K represents any spin-0 scalar field.
With this class, point 2 of the triangle (i.e. the vertex opposite d2) is the one with the
scalar value. Use `NKKCorrelation` and `KKNCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> knk = treecorr.NKKCorrelation(config)
>>> knk.process(cat1, cat2, cat1) # Compute cross-correlation of two fields.
>>> knk.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> knk.write(file_name) # Write out to a file.
>>> krk.process(cat1, rand, cat1) # Compute cross-correlation with randoms.
>>> knk.calculateZeta(krk=krk) # Calculate zeta using randoms
>>> zeta = knk.zeta # Access correlation function
>>> zetar = knk.zetar # Or access real and imaginary parts separately
>>> zetai = knk.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'KNKCorrelation'
_letter1 = 'K'
_letter2 = 'N'
_letter3 = 'K'
_letters = 'KNK'
_builder = _treecorr.KNKCorr
_calculateVar1 = staticmethod(calculateVarK)
_calculateVar2 = lambda *args, **kwargs: None
_calculateVar3 = staticmethod(calculateVarK)
_sig1 = 'sig_k'
_sig2 = None
_sig3 = 'sig_k'
_default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._krk = None
shape = self.data_shape
self._z[0] = np.zeros(shape, dtype=float)
if self.bin_type == 'LogMultipole':
self._z[1] = np.zeros(shape, dtype=float)
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building KNKCorr')
@property
def raw_zeta(self):
return self._z[0]
@property
def zeta(self):
if self._zeta is None:
if self._z[1].size:
return self._z[0] + 1j * self._z[1]
else:
return self._z[0]
else:
return self._zeta
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._krk is not None and self._krk is not True:
ret._krk = self._krk.copy()
return ret
[docs] def finalize(self, vark1, vark2):
"""Finalize the calculation of the correlation function.
Parameters:
vark1 (float): The variance of the first scalar field.
vark2 (float): The variance of the second scalar field.
"""
self._finalize()
self._var_num = vark1 * vark2
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._krk = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, krk=None):
r"""Calculate the correlation function possibly given another correlation function
that uses random points for the foreground objects.
- If krk is None, the simple correlation function (self.zeta) is returned.
- If krk is not None, then a compensated calculation is done:
:math:`\zeta = (KDK - KRK)`, where KDK represents the correlation of the kappa
field with the data points and KRK represents the correlation with random points.
After calling this function, the attributes ``zeta``, ``varzeta`` and ``cov`` will
correspond to the compensated values (if krk is provided). The raw, uncompensated values
are available as ``rawxi`` and ``raw_varxi``.
Parameters:
krk (KNKCorrelation): The cross-correlation using random locations as the lenses (RKK),
if desired. (default: None)
Returns:
Tuple containing
- zeta = array of :math:`\zeta`
- varzeta = array of variance estimates of :math:`\zeta`
"""
if krk is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
self._zeta = self.raw_zeta - krk.zeta
self._krk = krk
if (krk.npatch2 not in (1,self.npatch2) or krk.npatch1 != self.npatch1
or krk.npatch3 != self.npatch3):
raise RuntimeError("KRK must be run with the same patches as KDK")
if len(self.results) > 0:
# If there are any krk patch pairs that aren't in results (e.g. due to different
# edge effects among the various pairs in consideration), then we need to add
# some dummy results to make sure all the right pairs are computed when we make
# the vectors for the covariance matrix.
template = next(iter(self.results.values())) # Just need something to copy.
for ijk in krk.results:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + krk.varzeta
else:
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._krk is not None:
# If krk has npatch2 = 1, adjust pairs appropriately
if self._krk.npatch2 == 1 and not all([p[1] == 0 for p in pairs]):
pairs = [(i,0,k,w) for i,j,k,w in pairs if i == j]
pairs = self._krk._keep_ok(pairs)
self._krk._calculate_xi_from_pairs(pairs, corr_only=True)
self._zeta = self.raw_zeta - self._krk.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zeta The estimator of :math:`\zeta` (For LogMultipole, this is split
into real and imaginary parts, zetar and zetai.)
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
if self.bin_type == 'LogMultipole':
return ['zetar', 'zetai', 'sigma_zeta']
else:
return ['zeta', 'sigma_zeta']
@property
def _write_class_data(self):
if self.bin_type == 'LogMultipole':
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
else:
return [self.zeta, np.sqrt(self.varzeta)]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
if self.bin_type == 'LogMultipole':
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
else:
self._z[0] = data['zeta'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]
[docs]class KKNCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point scalar-scalar-count correlation
function, where as usual K represents any spin-0 scalar field.
With this class, point 3 of the triangle (i.e. the vertex opposite d3) is the one with the
scalar value. Use `NKKCorrelation` and `KNKCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> kkn = treecorr.KKNCorrelation(config)
>>> kkn.process(cat1, cat2) # Compute cross-correlation of two fields.
>>> kkn.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> kkn.write(file_name) # Write out to a file.
>>> kkr.process(cat1, rand) # Compute cross-correlation with randoms.
>>> kkn.calculateZeta(kkr=kkr) # Calculate zeta using randoms
>>> zeta = kkn.zeta # Access correlation function
>>> zetar = kkn.zetar # Or access real and imaginary parts separately
>>> zetai = kkn.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'KKNCorrelation'
_letter1 = 'K'
_letter2 = 'K'
_letter3 = 'N'
_letters = 'KKN'
_builder = _treecorr.KKNCorr
_calculateVar1 = staticmethod(calculateVarK)
_calculateVar2 = staticmethod(calculateVarK)
_calculateVar3 = lambda *args, **kwargs: None
_sig1 = 'sig_k'
_sig2 = 'sig_k'
_sig3 = None
_default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._kkr = None
shape = self.data_shape
self._z[0] = np.zeros(shape, dtype=float)
if self.bin_type == 'LogMultipole':
self._z[1] = np.zeros(shape, dtype=float)
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building KKNCorr')
@property
def raw_zeta(self):
return self._z[0]
@property
def zeta(self):
if self._zeta is None:
if self._z[1].size:
return self._z[0] + 1j * self._z[1]
else:
return self._z[0]
else:
return self._zeta
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._kkr is not None and self._kkr is not True:
ret._kkr = self._kkr.copy()
return ret
[docs] def finalize(self, vark1, vark2):
"""Finalize the calculation of the correlation function.
Parameters:
vark1 (float): The variance of the first scalar field.
vark2 (float): The variance of the second scalar field.
"""
self._finalize()
self._var_num = vark1 * vark2
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._kkr = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, kkr=None):
r"""Calculate the correlation function possibly given another correlation function
that uses random points for the foreground objects.
- If kkr is None, the simple correlation function (self.zeta) is returned.
- If kkr is not None, then a compensated calculation is done:
:math:`\zeta = (DKK - RKK)`, where DKK represents the correlation of the kappa
field with the data points and RKK represents the correlation with random points.
After calling this function, the attributes ``zeta``, ``varzeta`` and ``cov`` will
correspond to the compensated values (if kkr is provided). The raw, uncompensated values
are available as ``rawxi`` and ``raw_varxi``.
Parameters:
kkr (KKNCorrelation): The cross-correlation using random locations as the lenses (RKK),
if desired. (default: None)
Returns:
Tuple containing
- zeta = array of :math:`\zeta`
- varzeta = array of variance estimates of :math:`\zeta`
"""
if kkr is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
self._zeta = self.raw_zeta - kkr.zeta
self._kkr = kkr
if (kkr.npatch3 not in (1,self.npatch3) or kkr.npatch1 != self.npatch1
or kkr.npatch2 != self.npatch2):
raise RuntimeError("KKR must be run with the same patches as KKD")
if len(self.results) > 0:
# If there are any kkr patch pairs that aren't in results (e.g. due to different
# edge effects among the various pairs in consideration), then we need to add
# some dummy results to make sure all the right pairs are computed when we make
# the vectors for the covariance matrix.
template = next(iter(self.results.values())) # Just need something to copy.
for ijk in kkr.results:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + kkr.varzeta
else:
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._kkr is not None:
# If kkr has npatch3 = 1, adjust pairs appropriately
if self._kkr.npatch3 == 1 and not all([p[2] == 0 for p in pairs]):
pairs = [(i,j,0,w) for i,j,k,w in pairs if i == k]
pairs = self._kkr._keep_ok(pairs)
self._kkr._calculate_xi_from_pairs(pairs, corr_only=True)
self._zeta = self.raw_zeta - self._kkr.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zeta The estimator of :math:`\zeta` (For LogMultipole, this is split
into real and imaginary parts, zetar and zetai.)
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
if self.bin_type == 'LogMultipole':
return ['zetar', 'zetai', 'sigma_zeta']
else:
return ['zeta', 'sigma_zeta']
@property
def _write_class_data(self):
if self.bin_type == 'LogMultipole':
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
else:
return [self.zeta, np.sqrt(self.varzeta)]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
if self.bin_type == 'LogMultipole':
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
else:
self._z[0] = data['zeta'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]