# Copyright (c) 2003-2024 by Mike Jarvis
#
# TreeCorr is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions, and the disclaimer given in the accompanying LICENSE
# file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the disclaimer given in the documentation
# and/or other materials provided with the distribution.
"""
.. module:: nngcorrelation
"""
import numpy as np
from . import _treecorr
from .catalog import calculateVarG
from .corr3base import Corr3
[docs]class GNNCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point scalar-count-count correlation
function, where as usual G represents any spin-0 scalar field.
With this class, point 1 of the triangle (i.e. the vertex opposite d1) is the one with the
scalar value. Use `NGNCorrelation` and `NNGCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> gnn = treecorr.GNNCorrelation(config)
>>> gnn.process(cat1, cat2) # Compute cross-correlation of two fields.
>>> gnn.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> grr.process(cat1, rand) # Compute cross-correlation with randoms.
>>> gdr.process(cat1, cat2, rand) # Compute cross-correlation with randoms and data
>>> gnn.write(file_name) # Write out to a file.
>>> gnn.calculateZeta(grr=grr, gdr=gdr) # Calculate zeta using randoms
>>> zeta = gnn.zeta # Access correlation function
>>> zetar = gnn.zetar # Or access real and imaginary parts separately
>>> zetai = gnn.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'GNNCorrelation'
_letter1 = 'G'
_letter2 = 'N'
_letter3 = 'N'
_letters = 'GNN'
_builder = _treecorr.GNNCorr
_calculateVar1 = staticmethod(calculateVarG)
_calculateVar2 = lambda *args, **kwargs: None
_calculateVar3 = lambda *args, **kwargs: None
_sig1 = 'sig_sn (per component)'
_sig2 = None
_sig3 = None
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._grr = None
self._gdr = None
self._grd = None
shape = self.data_shape
self._z[0:2] = [np.zeros(shape, dtype=float) for _ in range(2)]
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building GNNCorr')
@property
def raw_zeta(self):
return self._z[0] + 1j * self._z[1]
@property
def zeta(self):
if self._zeta is None:
return self.raw_zeta
else:
return self._zeta
@property
def zetar(self):
if self._zeta is None:
return self._z[0]
else:
return np.real(self._zeta)
@property
def zetai(self):
if self._zeta is None:
return self._z[1]
else:
return np.imag(self._zeta)
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._grr is not None and self._grr is not True:
ret._grr = self._grr.copy()
if self._gdr is not None and self._gdr is not True:
ret._gdr = self._gdr.copy()
if self._grd is not None and self._grd is not True:
ret._grd = self._grd.copy()
return ret
[docs] def finalize(self, varg):
"""Finalize the calculation of the correlation function.
Parameters:
varg (float): The variance of the shear field.
"""
self._finalize()
self._var_num = varg
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._grr = None
self._grd = None
self._gdr = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, grr=None, gdr=None, grd=None):
r"""Calculate the correlation function given another correlation function of random
points using the same mask, and possibly cross correlations of the data and random.
The grr value is the `GNNCorrelation` function for random points with the scalar field.
One can also provide a cross correlation of the count data with randoms and the scalar.
- If grr is None, the simple correlation function (self.zeta) is returned.
- If only grr is given the compensated value :math:`\zeta = GDD - GRR` is returned.
- if gdr is given and grd is None (or vice versa), then :math:`\zeta = GDD - 2GDR + GRR`
is returned.
- If gdr and grd are both given, then :math:`\zeta = GDD - GDR - GRD + GRR` is returned.
where GDD is the data GNN correlation function, which is the current object.
After calling this method, you can use the `Corr2.estimate_cov` method or use this
correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and
varzeta returned from this function will be available as attributes.
Parameters:
grr (GNNCorrelation): The correlation of the random points with the scalar field
(GRR) (default: None)
gdr (GNNCorrelation): The cross-correlation of the data with both randoms and the
scalar field (GDR), if desired. (default: None)
grd (GNNCorrelation): The cross-correlation of the randoms with both the data and the
scalar field (GRD), if desired. (default: None)
Returns:
Tuple containing:
- zeta = array of :math:`\zeta(r)`
- varzeta = an estimate of the variance of :math:`\zeta(r)`
"""
# Calculate zeta based on which randoms are provided.
if grr is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
if gdr is None and grd is None:
self._zeta = self.raw_zeta - grr.zeta
elif grd is not None and gdr is None:
self._zeta = self.raw_zeta - 2.*grd.zeta + grr.zeta
elif gdr is not None and grd is None:
self._zeta = self.raw_zeta - 2.*gdr.zeta + grr.zeta
else:
self._zeta = self.raw_zeta - gdr.zeta - grd.zeta + grr.zeta
self._grr = grr
self._gdr = gdr
self._grd = grd
if (grr.npatch2 not in (1,self.npatch2) or grr.npatch3 not in (1,self.npatch3)
or grr.npatch1 != self.npatch1):
raise RuntimeError("GRR must be run with the same patches as GDD")
if grd is not None and (grd.npatch2 not in (1,self.npatch2)
or grd.npatch1 != self.npatch1
or grd.npatch3 != self.npatch3):
raise RuntimeError("GRD must be run with the same patches as GDD")
if gdr is not None and (gdr.npatch3 not in (1,self.npatch3)
or gdr.npatch1 != self.npatch1
or gdr.npatch2 != self.npatch2):
raise RuntimeError("GDR must be run with the same patches as GDD")
if len(self.results) > 0:
template = next(iter(self.results.values())) # Just need something to copy.
all_keys = list(grr.results.keys())
if grd is not None:
all_keys += list(grd.results.keys())
if gdr is not None:
all_keys += list(gdr.results.keys())
for ijk in all_keys:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + grr.varzeta
if grd is not None:
self._comp_varzeta += grd.varzeta
if gdr is not None:
self._comp_varzeta += gdr.varzeta
else:
if grd is not None:
raise TypeError("grd is invalid if grr is None")
if gdr is not None:
raise TypeError("gdr is invalid if grr is None")
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._grr is not None:
# If r doesn't have patches, then convert all (i,i,i) pairs to (i,0,0).
if self._grr.npatch2 == 1 and not all([p[1] == 0 for p in pairs]):
pairs1 = [(i,0,0,w) for i,j,k,w in pairs if i == j == k]
else:
pairs1 = pairs
pairs1 = self._grr._keep_ok(pairs1)
self._grr._calculate_xi_from_pairs(pairs1, corr_only=True)
if self._gdr is not None:
pairs2 = pairs
if self._gdr.npatch3 == 1 and not all([p[2] == 0 for p in pairs]):
pairs2 = [(i,j,0,w) for i,j,k,w in pairs if j == k]
pairs2 = self._gdr._keep_ok(pairs2)
self._gdr._calculate_xi_from_pairs(pairs2, corr_only=True)
if self._grd is not None:
pairs3 = pairs
if self._grd.npatch2 == 1 and not all([p[1] == 0 for p in pairs]):
pairs3 = [(i,0,k,w) for i,j,k,w in pairs if i == k]
pairs3 = self._grd._keep_ok(pairs3)
self._grd._calculate_xi_from_pairs(pairs3, corr_only=True)
if self._grr is None:
self._zeta = None
elif self._gdr is None and self._grd is None:
self._zeta = self.raw_zeta - self._grr.zeta
elif self._grd is not None and self._gdr is None:
self._zeta = self.raw_zeta - 2.*self._grd.zeta + self._grr.zeta
elif self._gdr is not None and self._grd is None:
self._zeta = self.raw_zeta - 2.*self._gdr.zeta + self._grr.zeta
else:
self._zeta = self.raw_zeta - self._gdr.zeta - self._grd.zeta + self._grr.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zetar The real part of the estimator of :math:`\zeta`
zetai The imag part of the estimator of :math:`\zeta`
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
return ['zetar', 'zetai', 'sigma_zeta']
@property
def _write_class_data(self):
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]
[docs]class NGNCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point count-scalar-count correlation
function, where as usual G represents any spin-0 scalar field.
With this class, point 2 of the triangle (i.e. the vertex opposite d2) is the one with the
scalar value. Use `GNNCorrelation` and `NNGCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> ngn = treecorr.GNNCorrelation(config)
>>> ngn.process(cat1, cat2, cat1) # Compute cross-correlation of two fields.
>>> ngn.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> rgr.process(rand, cat2, rand) # Compute cross-correlation with randoms.
>>> dgr.process(cat1, cat2, rand) # Compute cross-correlation with randoms and data
>>> ngn.write(file_name) # Write out to a file.
>>> ngn.calculateZeta(rgr=rgr, dgr=dgr) # Calculate zeta using randoms
>>> zeta = ngn.zeta # Access correlation function
>>> zetar = ngn.zetar # Or access real and imaginary parts separately
>>> zetai = ngn.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'NGNCorrelation'
_letter1 = 'N'
_letter2 = 'G'
_letter3 = 'N'
_letters = 'NGN'
_builder = _treecorr.NGNCorr
_calculateVar1 = lambda *args, **kwargs: None
_calculateVar2 = staticmethod(calculateVarG)
_calculateVar3 = lambda *args, **kwargs: None
_sig1 = None
_sig2 = 'sig_sn (per component)'
_sig3 = None
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._rgr = None
self._dgr = None
self._rgd = None
shape = self.data_shape
self._z[0] = np.zeros(shape, dtype=float)
self._z[1] = np.zeros(shape, dtype=float)
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building NGNCorr')
@property
def raw_zeta(self):
return self._z[0] + 1j * self._z[1]
@property
def zeta(self):
if self._zeta is None:
return self.raw_zeta
else:
return self._zeta
@property
def zetar(self):
if self._zeta is None:
return self._z[0]
else:
return np.real(self._zeta)
@property
def zetai(self):
if self._zeta is None:
return self._z[1]
else:
return np.imag(self._zeta)
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._rgr is not None and self._rgr is not True:
ret._rgr = self._rgr.copy()
if self._dgr is not None and self._dgr is not True:
ret._dgr = self._dgr.copy()
if self._rgd is not None and self._rgd is not True:
ret._rgd = self._rgd.copy()
return ret
[docs] def finalize(self, varg):
"""Finalize the calculation of the correlation function.
Parameters:
varg (float): The variance of the shear field.
"""
self._finalize()
self._var_num = varg
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._rgr = None
self._rgd = None
self._dgr = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, rgr=None, dgr=None, rgd=None):
r"""Calculate the correlation function given another correlation function of random
points using the same mask, and possibly cross correlations of the data and random.
The rgr value is the `NGNCorrelation` function for random points with the scalar field.
One can also provide a cross correlation of the count data with randoms and the scalar.
- If rgr is None, the simple correlation function (self.zeta) is returned.
- If only rgr is given the compensated value :math:`\zeta = DGD - RGR` is returned.
- if dgr is given and rgd is None (or vice versa), then :math:`\zeta = DGD - 2DGR + RGR`
is returned.
- If dgr and rgd are both given, then :math:`\zeta = DGD - DGR - RGD + RGR` is returned.
where DGD is the data NGN correlation function, which is the current object.
After calling this method, you can use the `Corr2.estimate_cov` method or use this
correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and
varzeta returned from this function will be available as attributes.
Parameters:
rgr (NGNCorrelation): The correlation of the random points with the scalar field
(RGR) (default: None)
dgr (NGNCorrelation): The cross-correlation of the data with both randoms and the
scalar field (DGR), if desired. (default: None)
rgd (NGNCorrelation): The cross-correlation of the randoms with both the data and the
scalar field (RGD), if desired. (default: None)
Returns:
Tuple containing:
- zeta = array of :math:`\zeta(r)`
- varzeta = an estimate of the variance of :math:`\zeta(r)`
"""
# Calculate zeta based on which randoms are provided.
if rgr is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
if dgr is None and rgd is None:
self._zeta = self.raw_zeta - rgr.zeta
elif rgd is not None and dgr is None:
self._zeta = self.raw_zeta - 2.*rgd.zeta + rgr.zeta
elif dgr is not None and rgd is None:
self._zeta = self.raw_zeta - 2.*dgr.zeta + rgr.zeta
else:
self._zeta = self.raw_zeta - dgr.zeta - rgd.zeta + rgr.zeta
self._rgr = rgr
self._dgr = dgr
self._rgd = rgd
if (rgr.npatch1 not in (1,self.npatch1) or rgr.npatch3 not in (1,self.npatch3)
or rgr.npatch2 != self.npatch2):
raise RuntimeError("RGR must be run with the same patches as DGD")
if rgd is not None and (rgd.npatch1 not in (1,self.npatch1)
or rgd.npatch2 != self.npatch2
or rgd.npatch3 != self.npatch3):
raise RuntimeError("RGD must be run with the same patches as DGD")
if dgr is not None and (dgr.npatch3 not in (1,self.npatch3)
or dgr.npatch1 != self.npatch1
or dgr.npatch2 != self.npatch2):
raise RuntimeError("DGR must be run with the same patches as DGD")
if len(self.results) > 0:
template = next(iter(self.results.values())) # Just need something to copy.
all_keys = list(rgr.results.keys())
if rgd is not None:
all_keys += list(rgd.results.keys())
if dgr is not None:
all_keys += list(dgr.results.keys())
for ijk in all_keys:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + rgr.varzeta
if rgd is not None:
self._comp_varzeta += rgd.varzeta
if dgr is not None:
self._comp_varzeta += dgr.varzeta
else:
if rgd is not None:
raise TypeError("rgd is invalid if rgr is None")
if dgr is not None:
raise TypeError("dgr is invalid if rgr is None")
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._rgr is not None:
# If r doesn't have patches, then convert all (i,i,i) pairs to (0,i,0).
if self._rgr.npatch1 == 1 and not all([p[0] == 0 for p in pairs]):
pairs1 = [(0,j,0,w) for i,j,k,w in pairs if i == j == k]
else:
pairs1 = pairs
pairs1 = self._rgr._keep_ok(pairs1)
self._rgr._calculate_xi_from_pairs(pairs1, corr_only=True)
if self._dgr is not None:
pairs2 = pairs
if self._dgr.npatch3 == 1 and not all([p[2] == 0 for p in pairs]):
pairs2 = [(i,j,0,w) for i,j,k,w in pairs if j == k]
pairs2 = self._dgr._keep_ok(pairs2)
self._dgr._calculate_xi_from_pairs(pairs2, corr_only=True)
if self._rgd is not None:
pairs3 = pairs
if self._rgd.npatch1 == 1 and not all([p[0] == 0 for p in pairs]):
pairs3 = [(0,j,k,w) for i,j,k,w in pairs if i == k]
pairs3 = self._rgd._keep_ok(pairs3)
self._rgd._calculate_xi_from_pairs(pairs3, corr_only=True)
if self._rgr is None:
self._zeta = None
elif self._dgr is None and self._rgd is None:
self._zeta = self.raw_zeta - self._rgr.zeta
elif self._rgd is not None and self._dgr is None:
self._zeta = self.raw_zeta - 2.*self._rgd.zeta + self._rgr.zeta
elif self._dgr is not None and self._rgd is None:
self._zeta = self.raw_zeta - 2.*self._dgr.zeta + self._rgr.zeta
else:
self._zeta = self.raw_zeta - self._dgr.zeta - self._rgd.zeta + self._rgr.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zetar The real part of the estimator of :math:`\zeta`
zetai The imag part of the estimator of :math:`\zeta`
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
return ['zetar', 'zetai', 'sigma_zeta']
@property
def _write_class_data(self):
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]
[docs]class NNGCorrelation(Corr3):
r"""This class handles the calculation and storage of a 3-point count-count-scalar correlation
function, where as usual G represents any spin-0 scalar field.
With this class, point 3 of the triangle (i.e. the vertex opposite d3) is the one with the
scalar value. Use `GNNCorrelation` and `NGNCorrelation` for classes with the scalar in the
other two positions.
See the doc string of `Corr3` for a description of how the triangles are binned along
with the attributes related to the different binning options.
In addition to the attributes common to all `Corr3` subclasses, objects of this class
hold the following attributes:
Attributes:
zeta: The correlation function, :math:`\zeta`.
varzeta: The variance estimate, only including the shot noise propagated into the
final correlation.
The typical usage pattern is as follows:
>>> nng = treecorr.NNGCorrelation(config)
>>> nng.process(cat1, cat2) # Compute cross-correlation of two fields.
>>> nng.process(cat1, cat2, cat3) # Compute cross-correlation of three fields.
>>> rrg.process(rand, cat2) # Compute cross-correlation with randoms.
>>> drg.process(cat1, rand, cat2) # Compute cross-correlation with randoms and data
>>> nng.write(file_name) # Write out to a file.
>>> nng.calculateZeta(rrg=rrg, drg=drg) # Calculate zeta using randoms
>>> zeta = nng.zeta # Access correlation function
>>> zetar = nng.zetar # Or access real and imaginary parts separately
>>> zetai = nng.zetai
Parameters:
config (dict): A configuration dict that can be used to pass in kwargs if desired.
This dict is allowed to have addition entries besides those listed
in `Corr3`, which are ignored here. (default: None)
logger: If desired, a logger object for logging. (default: None, in which case
one will be built according to the config dict's verbose level.)
Keyword Arguments:
**kwargs: See the documentation for `Corr3` for the list of allowed keyword
arguments, which may be passed either directly or in the config dict.
"""
_cls = 'NNGCorrelation'
_letter1 = 'N'
_letter2 = 'N'
_letter3 = 'G'
_letters = 'NNG'
_builder = _treecorr.NNGCorr
_calculateVar1 = lambda *args, **kwargs: None
_calculateVar2 = lambda *args, **kwargs: None
_calculateVar3 = staticmethod(calculateVarG)
_sig1 = None
_sig2 = None
_sig3 = 'sig_sn (per component)'
[docs] def __init__(self, config=None, *, logger=None, **kwargs):
super().__init__(config, logger=logger, **kwargs)
self._rrg = None
self._drg = None
self._rdg = None
shape = self.data_shape
self._z[0] = np.zeros(shape, dtype=float)
self._z[1] = np.zeros(shape, dtype=float)
self._zeta = None
self._comp_varzeta = None
self.logger.debug('Finished building NNGCorr')
@property
def raw_zeta(self):
return self._z[0] + 1j * self._z[1]
@property
def zeta(self):
if self._zeta is None:
return self.raw_zeta
else:
return self._zeta
@property
def zetar(self):
if self._zeta is None:
return self._z[0]
else:
return np.real(self._zeta)
@property
def zetai(self):
if self._zeta is None:
return self._z[1]
else:
return np.imag(self._zeta)
[docs] def copy(self):
ret = super().copy()
# True is possible during read before we finish reading in these attributes.
if self._rrg is not None and self._rrg is not True:
ret._rrg = self._rrg.copy()
if self._drg is not None and self._drg is not True:
ret._drg = self._drg.copy()
if self._rdg is not None and self._rdg is not True:
ret._rdg = self._rdg.copy()
return ret
[docs] def finalize(self, varg):
"""Finalize the calculation of the correlation function.
Parameters:
varg (float): The variance of the shear field.
"""
self._finalize()
self._var_num = varg
if self.bin_type in ['LogSAS', 'LogMultipole']:
self._var_num *= 2
@property
def raw_varzeta(self):
if self._varzeta is None:
self._calculate_varzeta(1)
return self._varzeta[0]
@property
def varzeta(self):
if self._comp_varzeta is None:
return self.raw_varzeta
else:
return self._comp_varzeta
def _clear(self):
super()._clear()
self._rrg = None
self._rdg = None
self._drg = None
self._zeta = None
self._comp_varzeta = None
[docs] def calculateZeta(self, *, rrg=None, drg=None, rdg=None):
r"""Calculate the correlation function given another correlation function of random
points using the same mask, and possibly cross correlations of the data and random.
The rrg value is the `NNGCorrelation` function for random points with the scalar field.
One can also provide a cross correlation of the count data with randoms and the scalar.
- If rrg is None, the simple correlation function (self.zeta) is returned.
- If only rrg is given the compensated value :math:`\zeta = DDG - RRG` is returned.
- if drg is given and rdg is None (or vice versa), then :math:`\zeta = DDG - 2DRG + RRG`
is returned.
- If drg and rdg are both given, then :math:`\zeta = DDG - DRG - RDG + RRG` is returned.
where DDG is the data NNG correlation function, which is the current object.
After calling this method, you can use the `Corr2.estimate_cov` method or use this
correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and
varzeta returned from this function will be available as attributes.
Parameters:
rrg (NNGCorrelation): The correlation of the random points with the scalar field
(RRG) (default: None)
drg (NNGCorrelation): The cross-correlation of the data with both randoms and the
scalar field (DRG), if desired. (default: None)
rdg (NNGCorrelation): The cross-correlation of the randoms with both the data and the
scalar field (RDG), if desired. (default: None)
Returns:
Tuple containing:
- zeta = array of :math:`\zeta(r)`
- varzeta = an estimate of the variance of :math:`\zeta(r)`
"""
# Calculate zeta based on which randoms are provided.
if rrg is not None:
if self.bin_type == 'LogMultipole':
raise TypeError("calculateZeta is not valid for LogMultipole binning")
if drg is None and rdg is None:
self._zeta = self.raw_zeta - rrg.zeta
elif rdg is not None and drg is None:
self._zeta = self.raw_zeta - 2.*rdg.zeta + rrg.zeta
elif drg is not None and rdg is None:
self._zeta = self.raw_zeta - 2.*drg.zeta + rrg.zeta
else:
self._zeta = self.raw_zeta - drg.zeta - rdg.zeta + rrg.zeta
self._rrg = rrg
self._drg = drg
self._rdg = rdg
if (rrg.npatch1 not in (1,self.npatch1) or rrg.npatch2 not in (1,self.npatch2)
or rrg.npatch3 != self.npatch3):
raise RuntimeError("RRG must be run with the same patches as DDG")
if rdg is not None and (rdg.npatch1 not in (1,self.npatch1)
or rdg.npatch2 != self.npatch2
or rdg.npatch3 != self.npatch3):
raise RuntimeError("RDG must be run with the same patches as DDG")
if drg is not None and (drg.npatch2 not in (1,self.npatch2)
or drg.npatch1 != self.npatch1
or drg.npatch3 != self.npatch3):
raise RuntimeError("DRG must be run with the same patches as DDG")
if len(self.results) > 0:
template = next(iter(self.results.values())) # Just need something to copy.
all_keys = list(rrg.results.keys())
if rdg is not None:
all_keys += list(rdg.results.keys())
if drg is not None:
all_keys += list(drg.results.keys())
for ijk in all_keys:
if ijk in self.results: continue
new_cij = template.copy()
new_cij._z[0][:] = 0
new_cij.weight[:] = 0
self.results[ijk] = new_cij
self.__dict__.pop('_ok',None)
self._cov = self.estimate_cov(self.var_method)
self._comp_varzeta = np.zeros(self.data_shape, dtype=float)
self._comp_varzeta.ravel()[:] = self.cov_diag
else:
self._comp_varzeta = self.raw_varzeta + rrg.varzeta
if rdg is not None:
self._comp_varzeta += rdg.varzeta
if drg is not None:
self._comp_varzeta += drg.varzeta
else:
if rdg is not None:
raise TypeError("rdg is invalid if rrg is None")
if drg is not None:
raise TypeError("drg is invalid if rrg is None")
self._zeta = self.raw_zeta
self._comp_varzeta = None
return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only):
super()._calculate_xi_from_pairs(pairs, corr_only)
if self._rrg is not None:
# If r doesn't have patches, then convert all (i,i,i) pairs to (0,0,i).
if self._rrg.npatch1 == 1 and not all([p[0] == 0 for p in pairs]):
pairs1 = [(0,0,k,w) for i,j,k,w in pairs if i == j == k]
else:
pairs1 = pairs
pairs1 = self._rrg._keep_ok(pairs1)
self._rrg._calculate_xi_from_pairs(pairs1, corr_only=True)
if self._drg is not None:
pairs2 = pairs
if self._drg.npatch2 == 1 and not all([p[1] == 0 for p in pairs]):
pairs2 = [(i,0,k,w) for i,j,k,w in pairs if j == k]
pairs2 = self._drg._keep_ok(pairs2)
self._drg._calculate_xi_from_pairs(pairs2, corr_only=True)
if self._rdg is not None:
pairs3 = pairs
if self._rdg.npatch1 == 1 and not all([p[0] == 0 for p in pairs]):
pairs3 = [(0,j,k,w) for i,j,k,w in pairs if i == k]
pairs3 = self._rdg._keep_ok(pairs3)
self._rdg._calculate_xi_from_pairs(pairs3, corr_only=True)
if self._rrg is None:
self._zeta = None
elif self._drg is None and self._rdg is None:
self._zeta = self.raw_zeta - self._rrg.zeta
elif self._rdg is not None and self._drg is None:
self._zeta = self.raw_zeta - 2.*self._rdg.zeta + self._rrg.zeta
elif self._drg is not None and self._rdg is None:
self._zeta = self.raw_zeta - 2.*self._drg.zeta + self._rrg.zeta
else:
self._zeta = self.raw_zeta - self._drg.zeta - self._rdg.zeta + self._rrg.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False,
write_cov=False):
super().write(file_name, file_type=file_type, precision=precision,
write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format(
r"""
zetar The real part of the estimator of :math:`\zeta`
zetai The imag part of the estimator of :math:`\zeta`
sigma_zeta The sqrt of the variance estimate of :math:`\zeta`.
""")
@property
def _write_class_col_names(self):
return ['zetar', 'zetai', 'sigma_zeta']
@property
def _write_class_data(self):
return [self._z[0], self._z[1], np.sqrt(self.varzeta) ]
def _read_from_data(self, data, params):
super()._read_from_data(data, params)
s = self.data_shape
self._z[0] = data['zetar'].reshape(s)
self._z[1] = data['zetai'].reshape(s)
self._comp_varzeta = data['sigma_zeta'].reshape(s)**2
self._varzeta = [self._comp_varzeta]