Source code for treecorr.nnkcorrelation

# Copyright (c) 2003-2024 by Mike Jarvis
#
# TreeCorr is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions, and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions, and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.

"""
.. module:: nnkcorrelation
"""

import numpy as np

from . import _treecorr
from .catalog import calculateVarK
from .corr3base import Corr3


[docs]class KNNCorrelation(Corr3): r"""This class handles the calculation and storage of a 3-point scalar-count-count correlation function, where as usual K represents any spin-0 scalar field. With this class, point 1 of the triangle (i.e. the vertex opposite d1) is the one with the scalar value. Use `NKNCorrelation` and `NNKCorrelation` for classes with the scalar in the other two positions. See the doc string of `Corr3` for a description of how the triangles are binned along with the attributes related to the different binning options. In addition to the attributes common to all `Corr3` subclasses, objects of this class hold the following attributes: Attributes: zeta: The correlation function, :math:`\zeta`. varzeta: The variance estimate, only including the shot noise propagated into the final correlation. The typical usage pattern is as follows: >>> knn = treecorr.KNNCorrelation(config) >>> knn.process(cat1, cat2) # Compute cross-correlation of two fields. >>> knn.process(cat1, cat2, cat3) # Compute cross-correlation of three fields. >>> krr.process(cat1, rand) # Compute cross-correlation with randoms. >>> kdr.process(cat1, cat2, rand) # Compute cross-correlation with randoms and data >>> knn.write(file_name) # Write out to a file. >>> knn.calculateZeta(krr=krr, kdr=kdr) # Calculate zeta using randoms >>> zeta = knn.zeta # Access correlation function >>> zetar = knn.zetar # Or access real and imaginary parts separately >>> zetai = knn.zetai Parameters: config (dict): A configuration dict that can be used to pass in kwargs if desired. This dict is allowed to have addition entries besides those listed in `Corr3`, which are ignored here. (default: None) logger: If desired, a logger object for logging. (default: None, in which case one will be built according to the config dict's verbose level.) Keyword Arguments: **kwargs: See the documentation for `Corr3` for the list of allowed keyword arguments, which may be passed either directly or in the config dict. """ _cls = 'KNNCorrelation' _letter1 = 'K' _letter2 = 'N' _letter3 = 'N' _letters = 'KNN' _builder = _treecorr.KNNCorr _calculateVar1 = staticmethod(calculateVarK) _calculateVar2 = lambda *args, **kwargs: None _calculateVar3 = lambda *args, **kwargs: None _sig1 = 'sig_k' _sig2 = None _sig3 = None _default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs): super().__init__(config, logger=logger, **kwargs) self._krr = None self._kdr = None self._krd = None shape = self.data_shape self._z[0] = np.zeros(shape, dtype=float) if self.bin_type == 'LogMultipole': self._z[1] = np.zeros(shape, dtype=float) self._zeta = None self._comp_varzeta = None self.logger.debug('Finished building KNNCorr')
@property def raw_zeta(self): return self._z[0] @property def zeta(self): if self._zeta is None: if self._z[1].size: return self._z[0] + 1j * self._z[1] else: return self._z[0] else: return self._zeta
[docs] def copy(self): ret = super().copy() # True is possible during read before we finish reading in these attributes. if self._krr is not None and self._krr is not True: ret._krr = self._krr.copy() if self._kdr is not None and self._kdr is not True: ret._kdr = self._kdr.copy() if self._krd is not None and self._krd is not True: ret._krd = self._krd.copy() return ret
[docs] def finalize(self, vark): """Finalize the calculation of the correlation function. Parameters: vark (float): The variance of the scalar field. """ self._finalize() self._var_num = vark if self.bin_type in ['LogSAS', 'LogMultipole']: self._var_num *= 2
@property def raw_varzeta(self): if self._varzeta is None: self._calculate_varzeta(1) return self._varzeta[0] @property def varzeta(self): if self._comp_varzeta is None: return self.raw_varzeta else: return self._comp_varzeta def _clear(self): super()._clear() self._krr = None self._krd = None self._kdr = None self._zeta = None self._comp_varzeta = None
[docs] def calculateZeta(self, *, krr=None, kdr=None, krd=None): r"""Calculate the correlation function given another correlation function of random points using the same mask, and possibly cross correlations of the data and random. The krr value is the `KNNCorrelation` function for random points with the scalar field. One can also provide a cross correlation of the count data with randoms and the scalar. - If krr is None, the simple correlation function (self.zeta) is returned. - If only krr is given the compensated value :math:`\zeta = KDD - KRR` is returned. - if kdr is given and krd is None (or vice versa), then :math:`\zeta = KDD - 2KDR + KRR` is returned. - If kdr and krd are both given, then :math:`\zeta = KDD - KDR - KRD + KRR` is returned. where KDD is the data KNN correlation function, which is the current object. After calling this method, you can use the `Corr2.estimate_cov` method or use this correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and varzeta returned from this function will be available as attributes. Parameters: krr (KNNCorrelation): The correlation of the random points with the scalar field (KRR) (default: None) kdr (KNNCorrelation): The cross-correlation of the data with both randoms and the scalar field (KDR), if desired. (default: None) krd (KNNCorrelation): The cross-correlation of the randoms with both the data and the scalar field (KRD), if desired. (default: None) Returns: Tuple containing: - zeta = array of :math:`\zeta(r)` - varzeta = an estimate of the variance of :math:`\zeta(r)` """ # Calculate zeta based on which randoms are provided. if krr is not None: if self.bin_type == 'LogMultipole': raise TypeError("calculateZeta is not valid for LogMultipole binning") if kdr is None and krd is None: self._zeta = self.raw_zeta - krr.zeta elif krd is not None and kdr is None: self._zeta = self.raw_zeta - 2.*krd.zeta + krr.zeta elif kdr is not None and krd is None: self._zeta = self.raw_zeta - 2.*kdr.zeta + krr.zeta else: self._zeta = self.raw_zeta - kdr.zeta - krd.zeta + krr.zeta self._krr = krr self._kdr = kdr self._krd = krd if (krr.npatch2 not in (1,self.npatch2) or krr.npatch3 not in (1,self.npatch3) or krr.npatch1 != self.npatch1): raise RuntimeError("KRR must be run with the same patches as KDD") if krd is not None and (krd.npatch2 not in (1,self.npatch2) or krd.npatch1 != self.npatch1 or krd.npatch3 != self.npatch3): raise RuntimeError("KRD must be run with the same patches as KDD") if kdr is not None and (kdr.npatch3 not in (1,self.npatch3) or kdr.npatch1 != self.npatch1 or kdr.npatch2 != self.npatch2): raise RuntimeError("KDR must be run with the same patches as KDD") if len(self.results) > 0: template = next(iter(self.results.values())) # Just need something to copy. all_keys = list(krr.results.keys()) if krd is not None: all_keys += list(krd.results.keys()) if kdr is not None: all_keys += list(kdr.results.keys()) for ijk in all_keys: if ijk in self.results: continue new_cij = template.copy() new_cij._z[0][:] = 0 new_cij.weight[:] = 0 self.results[ijk] = new_cij self.__dict__.pop('_ok',None) self._cov = self.estimate_cov(self.var_method) self._comp_varzeta = np.zeros(self.data_shape, dtype=float) self._comp_varzeta.ravel()[:] = self.cov_diag else: self._comp_varzeta = self.raw_varzeta + krr.varzeta if krd is not None: self._comp_varzeta += krd.varzeta if kdr is not None: self._comp_varzeta += kdr.varzeta else: if krd is not None: raise TypeError("krd is invalid if krr is None") if kdr is not None: raise TypeError("kdr is invalid if krr is None") self._zeta = self.raw_zeta self._comp_varzeta = None return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only): super()._calculate_xi_from_pairs(pairs, corr_only) if self._krr is not None: # If r doesn't have patches, then convert all (i,i,i) pairs to (i,0,0). if self._krr.npatch2 == 1 and not all([p[1] == 0 for p in pairs]): pairs1 = [(i,0,0,w) for i,j,k,w in pairs if i == j == k] else: pairs1 = pairs pairs1 = self._krr._keep_ok(pairs1) self._krr._calculate_xi_from_pairs(pairs1, corr_only=True) if self._kdr is not None: pairs2 = pairs if self._kdr.npatch3 == 1 and not all([p[2] == 0 for p in pairs]): pairs2 = [(i,j,0,w) for i,j,k,w in pairs if j == k] pairs2 = self._kdr._keep_ok(pairs2) self._kdr._calculate_xi_from_pairs(pairs2, corr_only=True) if self._krd is not None: pairs3 = pairs if self._krd.npatch2 == 1 and not all([p[1] == 0 for p in pairs]): pairs3 = [(i,0,k,w) for i,j,k,w in pairs if i == k] pairs3 = self._krd._keep_ok(pairs3) self._krd._calculate_xi_from_pairs(pairs3, corr_only=True) if self._krr is None: self._zeta = None elif self._kdr is None and self._krd is None: self._zeta = self.raw_zeta - self._krr.zeta elif self._krd is not None and self._kdr is None: self._zeta = self.raw_zeta - 2.*self._krd.zeta + self._krr.zeta elif self._kdr is not None and self._krd is None: self._zeta = self.raw_zeta - 2.*self._kdr.zeta + self._krr.zeta else: self._zeta = self.raw_zeta - self._kdr.zeta - self._krd.zeta + self._krr.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False, write_cov=False): super().write(file_name, file_type=file_type, precision=precision, write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format( r""" zeta The estimator of :math:`\zeta` (For LogMultipole, this is split into real and imaginary parts, zetar and zetai.) sigma_zeta The sqrt of the variance estimate of :math:`\zeta`. """) @property def _write_class_col_names(self): if self.bin_type == 'LogMultipole': return ['zetar', 'zetai', 'sigma_zeta'] else: return ['zeta', 'sigma_zeta'] @property def _write_class_data(self): if self.bin_type == 'LogMultipole': return [self._z[0], self._z[1], np.sqrt(self.varzeta) ] else: return [self.zeta, np.sqrt(self.varzeta)] def _read_from_data(self, data, params): super()._read_from_data(data, params) s = self.data_shape if self.bin_type == 'LogMultipole': self._z[0] = data['zetar'].reshape(s) self._z[1] = data['zetai'].reshape(s) else: self._z[0] = data['zeta'].reshape(s) self._comp_varzeta = data['sigma_zeta'].reshape(s)**2 self._varzeta = [self._comp_varzeta]
[docs]class NKNCorrelation(Corr3): r"""This class handles the calculation and storage of a 3-point count-scalar-count correlation function, where as usual K represents any spin-0 scalar field. With this class, point 2 of the triangle (i.e. the vertex opposite d2) is the one with the scalar value. Use `KNNCorrelation` and `NNKCorrelation` for classes with the scalar in the other two positions. See the doc string of `Corr3` for a description of how the triangles are binned along with the attributes related to the different binning options. In addition to the attributes common to all `Corr3` subclasses, objects of this class hold the following attributes: Attributes: zeta: The correlation function, :math:`\zeta`. varzeta: The variance estimate, only including the shot noise propagated into the final correlation. The typical usage pattern is as follows: >>> nkn = treecorr.NKNCorrelation(config) >>> nkn.process(cat1, cat2, cat1) # Compute cross-correlation of two fields. >>> nkn.process(cat1, cat2, cat3) # Compute cross-correlation of three fields. >>> rkr.process(rand, cat2, rand) # Compute cross-correlation with randoms. >>> dkr.process(cat1, cat2, rand) # Compute cross-correlation with randoms and data >>> nkn.write(file_name) # Write out to a file. >>> nkn.calculateZeta(rkr=rkr, dkr=dkr) # Calculate zeta using randoms >>> zeta = nkn.zeta # Access correlation function >>> zetar = nkn.zetar # Or access real and imaginary parts separately >>> zetai = nkn.zetai Parameters: config (dict): A configuration dict that can be used to pass in kwargs if desired. This dict is allowed to have addition entries besides those listed in `Corr3`, which are ignored here. (default: None) logger: If desired, a logger object for logging. (default: None, in which case one will be built according to the config dict's verbose level.) Keyword Arguments: **kwargs: See the documentation for `Corr3` for the list of allowed keyword arguments, which may be passed either directly or in the config dict. """ _cls = 'NKNCorrelation' _letter1 = 'N' _letter2 = 'K' _letter3 = 'N' _letters = 'NKN' _builder = _treecorr.NKNCorr _calculateVar1 = lambda *args, **kwargs: None _calculateVar2 = staticmethod(calculateVarK) _calculateVar3 = lambda *args, **kwargs: None _sig1 = None _sig2 = 'sig_k' _sig3 = None _default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs): super().__init__(config, logger=logger, **kwargs) self._rkr = None self._dkr = None self._rkd = None shape = self.data_shape self._z[0] = np.zeros(shape, dtype=float) if self.bin_type == 'LogMultipole': self._z[1] = np.zeros(shape, dtype=float) self._zeta = None self._comp_varzeta = None self.logger.debug('Finished building NKNCorr')
@property def raw_zeta(self): return self._z[0] @property def zeta(self): if self._zeta is None: if self._z[1].size: return self._z[0] + 1j * self._z[1] else: return self._z[0] else: return self._zeta
[docs] def copy(self): ret = super().copy() # True is possible during read before we finish reading in these attributes. if self._rkr is not None and self._rkr is not True: ret._rkr = self._rkr.copy() if self._dkr is not None and self._dkr is not True: ret._dkr = self._dkr.copy() if self._rkd is not None and self._rkd is not True: ret._rkd = self._rkd.copy() return ret
[docs] def finalize(self, vark): """Finalize the calculation of the correlation function. Parameters: vark (float): The variance of the scalar field. """ self._finalize() self._var_num = vark if self.bin_type in ['LogSAS', 'LogMultipole']: self._var_num *= 2
@property def raw_varzeta(self): if self._varzeta is None: self._calculate_varzeta(1) return self._varzeta[0] @property def varzeta(self): if self._comp_varzeta is None: return self.raw_varzeta else: return self._comp_varzeta def _clear(self): super()._clear() self._rkr = None self._dkr = None self._rkd = None self._zeta = None self._comp_varzeta = None
[docs] def calculateZeta(self, *, rkr=None, dkr=None, rkd=None): r"""Calculate the correlation function given another correlation function of random points using the same mask, and possibly cross correlations of the data and random. The rkr value is the `NKNCorrelation` function for random points with the scalar field. One can also provide a cross correlation of the count data with randoms and the scalar. - If rkr is None, the simple correlation function (self.zeta) is returned. - If only rkr is given the compensated value :math:`\zeta = DKD - RKR` is returned. - if dkr is given and rkd is None (or vice versa), then :math:`\zeta = DKD - 2DKR + RKR` is returned. - If dkr and rkd are both given, then :math:`\zeta = DKD - DKR - RKD + RKR` is returned. where DKD is the data NKN correlation function, which is the current object. After calling this method, you can use the `Corr2.estimate_cov` method or use this correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and varzeta returned from this function will be available as attributes. Parameters: rkr (NKNCorrelation): The correlation of the random points with the scalar field (RKR) (default: None) dkr (NKNCorrelation): The cross-correlation of the data with both randoms and the scalar field (DKR), if desired. (default: None) rkd (NKNCorrelation): The cross-correlation of the randoms with both the data and the scalar field (RKD), if desired. (default: None) Returns: Tuple containing: - zeta = array of :math:`\zeta(r)` - varzeta = an estimate of the variance of :math:`\zeta(r)` """ # Calculate zeta based on which randoms are provided. if rkr is not None: if self.bin_type == 'LogMultipole': raise TypeError("calculateZeta is not valid for LogMultipole binning") if dkr is None and rkd is None: self._zeta = self.raw_zeta - rkr.zeta elif rkd is not None and dkr is None: self._zeta = self.raw_zeta - 2.*rkd.zeta + rkr.zeta elif dkr is not None and rkd is None: self._zeta = self.raw_zeta - 2.*dkr.zeta + rkr.zeta else: self._zeta = self.raw_zeta - dkr.zeta - rkd.zeta + rkr.zeta self._rkr = rkr self._dkr = dkr self._rkd = rkd if (rkr.npatch1 not in (1,self.npatch1) or rkr.npatch3 not in (1,self.npatch3) or rkr.npatch2 != self.npatch2): raise RuntimeError("RKR must be run with the same patches as DKD") if rkd is not None and (rkd.npatch1 not in (1,self.npatch1) or rkd.npatch2 != self.npatch2 or rkd.npatch3 != self.npatch3): raise RuntimeError("RKD must be run with the same patches as DKD") if dkr is not None and (dkr.npatch3 not in (1,self.npatch3) or dkr.npatch1 != self.npatch1 or dkr.npatch2 != self.npatch2): raise RuntimeError("DKR must be run with the same patches as DKD") if len(self.results) > 0: template = next(iter(self.results.values())) # Just need something to copy. all_keys = list(rkr.results.keys()) if rkd is not None: all_keys += list(rkd.results.keys()) if dkr is not None: all_keys += list(dkr.results.keys()) for ijk in all_keys: if ijk in self.results: continue new_cij = template.copy() new_cij._z[0][:] = 0 new_cij.weight[:] = 0 self.results[ijk] = new_cij self.__dict__.pop('_ok',None) self._cov = self.estimate_cov(self.var_method) self._comp_varzeta = np.zeros(self.data_shape, dtype=float) self._comp_varzeta.ravel()[:] = self.cov_diag else: self._comp_varzeta = self.raw_varzeta + rkr.varzeta if rkd is not None: self._comp_varzeta += rkd.varzeta if dkr is not None: self._comp_varzeta += dkr.varzeta else: if rkd is not None: raise TypeError("rkd is invalid if rkr is None") if dkr is not None: raise TypeError("dkr is invalid if rkr is None") self._zeta = self.raw_zeta self._comp_varzeta = None return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only): super()._calculate_xi_from_pairs(pairs, corr_only) if self._rkr is not None: # If r doesn't have patches, then convert all (i,i,i) pairs to (0,i,0). if self._rkr.npatch1 == 1 and not all([p[0] == 0 for p in pairs]): pairs1 = [(0,j,0,w) for i,j,k,w in pairs if i == j == k] else: pairs1 = pairs pairs1 = self._rkr._keep_ok(pairs1) self._rkr._calculate_xi_from_pairs(pairs1, corr_only=True) if self._dkr is not None: pairs2 = pairs if self._dkr.npatch3 == 1 and not all([p[2] == 0 for p in pairs]): pairs2 = [(i,j,0,w) for i,j,k,w in pairs if j == k] pairs2 = self._dkr._keep_ok(pairs2) self._dkr._calculate_xi_from_pairs(pairs2, corr_only=True) if self._rkd is not None: pairs3 = pairs if self._rkd.npatch1 == 1 and not all([p[0] == 0 for p in pairs]): pairs3 = [(0,j,k,w) for i,j,k,w in pairs if i == k] pairs3 = self._rkd._keep_ok(pairs3) self._rkd._calculate_xi_from_pairs(pairs3, corr_only=True) if self._rkr is None: self._zeta = None elif self._dkr is None and self._rkd is None: self._zeta = self.raw_zeta - self._rkr.zeta elif self._rkd is not None and self._dkr is None: self._zeta = self.raw_zeta - 2.*self._rkd.zeta + self._rkr.zeta elif self._dkr is not None and self._rkd is None: self._zeta = self.raw_zeta - 2.*self._dkr.zeta + self._rkr.zeta else: self._zeta = self.raw_zeta - self._dkr.zeta - self._rkd.zeta + self._rkr.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False, write_cov=False): super().write(file_name, file_type=file_type, precision=precision, write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format( r""" zeta The estimator of :math:`\zeta` (For LogMultipole, this is split into real and imaginary parts, zetar and zetai.) sigma_zeta The sqrt of the variance estimate of :math:`\zeta`. """) @property def _write_class_col_names(self): if self.bin_type == 'LogMultipole': return ['zetar', 'zetai', 'sigma_zeta'] else: return ['zeta', 'sigma_zeta'] @property def _write_class_data(self): if self.bin_type == 'LogMultipole': return [self._z[0], self._z[1], np.sqrt(self.varzeta) ] else: return [self.zeta, np.sqrt(self.varzeta)] def _read_from_data(self, data, params): super()._read_from_data(data, params) s = self.data_shape if self.bin_type == 'LogMultipole': self._z[0] = data['zetar'].reshape(s) self._z[1] = data['zetai'].reshape(s) else: self._z[0] = data['zeta'].reshape(s) self._comp_varzeta = data['sigma_zeta'].reshape(s)**2 self._varzeta = [self._comp_varzeta]
[docs]class NNKCorrelation(Corr3): r"""This class handles the calculation and storage of a 3-point count-count-scalar correlation function, where as usual K represents any spin-0 scalar field. With this class, point 3 of the triangle (i.e. the vertex opposite d3) is the one with the scalar value. Use `KNNCorrelation` and `NKNCorrelation` for classes with the scalar in the other two positions. See the doc string of `Corr3` for a description of how the triangles are binned along with the attributes related to the different binning options. In addition to the attributes common to all `Corr3` subclasses, objects of this class hold the following attributes: Attributes: zeta: The correlation function, :math:`\zeta`. varzeta: The variance estimate, only including the shot noise propagated into the final correlation. The typical usage pattern is as follows: >>> nnk = treecorr.NNKCorrelation(config) >>> nnk.process(cat1, cat2) # Compute cross-correlation of two fields. >>> nnk.process(cat1, cat2, cat3) # Compute cross-correlation of three fields. >>> rrk.process(rand, cat2) # Compute cross-correlation with randoms. >>> drk.process(cat1, rand, cat2) # Compute cross-correlation with randoms and data >>> nnk.write(file_name) # Write out to a file. >>> nnk.calculateZeta(rrk=rrk, drk=drk) # Calculate zeta using randoms >>> zeta = nnk.zeta # Access correlation function >>> zetar = nnk.zetar # Or access real and imaginary parts separately >>> zetai = nnk.zetai Parameters: config (dict): A configuration dict that can be used to pass in kwargs if desired. This dict is allowed to have addition entries besides those listed in `Corr3`, which are ignored here. (default: None) logger: If desired, a logger object for logging. (default: None, in which case one will be built according to the config dict's verbose level.) Keyword Arguments: **kwargs: See the documentation for `Corr3` for the list of allowed keyword arguments, which may be passed either directly or in the config dict. """ _cls = 'NNKCorrelation' _letter1 = 'N' _letter2 = 'N' _letter3 = 'K' _letters = 'NNK' _builder = _treecorr.NNKCorr _calculateVar1 = lambda *args, **kwargs: None _calculateVar2 = lambda *args, **kwargs: None _calculateVar3 = staticmethod(calculateVarK) _sig1 = None _sig2 = None _sig3 = 'sig_k' _default_angle_slop = 1
[docs] def __init__(self, config=None, *, logger=None, **kwargs): super().__init__(config, logger=logger, **kwargs) self._rrk = None self._drk = None self._rdk = None shape = self.data_shape self._z[0] = np.zeros(shape, dtype=float) if self.bin_type == 'LogMultipole': self._z[1] = np.zeros(shape, dtype=float) self._zeta = None self._comp_varzeta = None self.logger.debug('Finished building NNKCorr')
@property def raw_zeta(self): return self._z[0] @property def zeta(self): if self._zeta is None: if self._z[1].size: return self._z[0] + 1j * self._z[1] else: return self._z[0] else: return self._zeta
[docs] def copy(self): ret = super().copy() # True is possible during read before we finish reading in these attributes. if self._rrk is not None and self._rrk is not True: ret._rrk = self._rrk.copy() if self._drk is not None and self._drk is not True: ret._drk = self._drk.copy() if self._rdk is not None and self._rdk is not True: ret._rdk = self._rdk.copy() return ret
[docs] def finalize(self, vark): """Finalize the calculation of the correlation function. Parameters: vark (float): The variance of the scalar field. """ self._finalize() self._var_num = vark if self.bin_type in ['LogSAS', 'LogMultipole']: self._var_num *= 2
@property def raw_varzeta(self): if self._varzeta is None: self._calculate_varzeta(1) return self._varzeta[0] @property def varzeta(self): if self._comp_varzeta is None: return self.raw_varzeta else: return self._comp_varzeta def _clear(self): super()._clear() self._rrk = None self._rdk = None self._drk = None self._zeta = None self._comp_varzeta = None
[docs] def calculateZeta(self, *, rrk=None, drk=None, rdk=None): r"""Calculate the correlation function given another correlation function of random points using the same mask, and possibly cross correlations of the data and random. The rrk value is the `NNKCorrelation` function for random points with the scalar field. One can also provide a cross correlation of the count data with randoms and the scalar. - If rrk is None, the simple correlation function (self.zeta) is returned. - If only rrk is given the compensated value :math:`\zeta = DDK - RRK` is returned. - if drk is given and rdk is None (or vice versa), then :math:`\zeta = DDK - 2DRK + RRK` is returned. - If drk and rdk are both given, then :math:`\zeta = DDK - DRK - RDK + RRK` is returned. where DDK is the data NNK correlation function, which is the current object. After calling this method, you can use the `Corr2.estimate_cov` method or use this correlation object in the `estimate_multi_cov` function. Also, the calculated zeta and varzeta returned from this function will be available as attributes. Parameters: rrk (NNKCorrelation): The correlation of the random points with the scalar field (RRK) (default: None) drk (NNKCorrelation): The cross-correlation of the data with both randoms and the scalar field (DRK), if desired. (default: None) rdk (NNKCorrelation): The cross-correlation of the randoms with both the data and the scalar field (RDK), if desired. (default: None) Returns: Tuple containing: - zeta = array of :math:`\zeta(r)` - varzeta = an estimate of the variance of :math:`\zeta(r)` """ # Calculate zeta based on which randoms are provided. if rrk is not None: if self.bin_type == 'LogMultipole': raise TypeError("calculateZeta is not valid for LogMultipole binning") if drk is None and rdk is None: self._zeta = self.raw_zeta - rrk.zeta elif rdk is not None and drk is None: self._zeta = self.raw_zeta - 2.*rdk.zeta + rrk.zeta elif drk is not None and rdk is None: self._zeta = self.raw_zeta - 2.*drk.zeta + rrk.zeta else: self._zeta = self.raw_zeta - drk.zeta - rdk.zeta + rrk.zeta self._rrk = rrk self._drk = drk self._rdk = rdk if (rrk.npatch1 not in (1,self.npatch1) or rrk.npatch2 not in (1,self.npatch2) or rrk.npatch3 != self.npatch3): raise RuntimeError("RRK must be run with the same patches as DDK") if rdk is not None and (rdk.npatch1 not in (1,self.npatch1) or rdk.npatch2 != self.npatch2 or rdk.npatch3 != self.npatch3): raise RuntimeError("RDK must be run with the same patches as DDK") if drk is not None and (drk.npatch2 not in (1,self.npatch2) or drk.npatch1 != self.npatch1 or drk.npatch3 != self.npatch3): raise RuntimeError("DRK must be run with the same patches as DDK") if len(self.results) > 0: template = next(iter(self.results.values())) # Just need something to copy. all_keys = list(rrk.results.keys()) if rdk is not None: all_keys += list(rdk.results.keys()) if drk is not None: all_keys += list(drk.results.keys()) for ijk in all_keys: if ijk in self.results: continue new_cij = template.copy() new_cij._z[0][:] = 0 new_cij.weight[:] = 0 self.results[ijk] = new_cij self.__dict__.pop('_ok',None) self._cov = self.estimate_cov(self.var_method) self._comp_varzeta = np.zeros(self.data_shape, dtype=float) self._comp_varzeta.ravel()[:] = self.cov_diag else: self._comp_varzeta = self.raw_varzeta + rrk.varzeta if rdk is not None: self._comp_varzeta += rdk.varzeta if drk is not None: self._comp_varzeta += drk.varzeta else: if rdk is not None: raise TypeError("rdk is invalid if rrk is None") if drk is not None: raise TypeError("drk is invalid if rrk is None") self._zeta = self.raw_zeta self._comp_varzeta = None return self._zeta, self.varzeta
def _calculate_xi_from_pairs(self, pairs, corr_only): super()._calculate_xi_from_pairs(pairs, corr_only) if self._rrk is not None: # If r doesn't have patches, then convert all (i,i,i) pairs to (0,0,i). if self._rrk.npatch1 == 1 and not all([p[0] == 0 for p in pairs]): pairs1 = [(0,0,k,w) for i,j,k,w in pairs if i == j == k] else: pairs1 = pairs pairs1 = self._rrk._keep_ok(pairs1) self._rrk._calculate_xi_from_pairs(pairs1, corr_only=True) if self._drk is not None: pairs2 = pairs if self._drk.npatch2 == 1 and not all([p[1] == 0 for p in pairs]): pairs2 = [(i,0,k,w) for i,j,k,w in pairs if j == k] pairs2 = self._drk._keep_ok(pairs2) self._drk._calculate_xi_from_pairs(pairs2, corr_only=True) if self._rdk is not None: pairs3 = pairs if self._rdk.npatch1 == 1 and not all([p[0] == 0 for p in pairs]): pairs3 = [(0,j,k,w) for i,j,k,w in pairs if i == k] pairs3 = self._rdk._keep_ok(pairs3) self._rdk._calculate_xi_from_pairs(pairs3, corr_only=True) if self._rrk is None: self._zeta = None elif self._drk is None and self._rdk is None: self._zeta = self.raw_zeta - self._rrk.zeta elif self._rdk is not None and self._drk is None: self._zeta = self.raw_zeta - 2.*self._rdk.zeta + self._rrk.zeta elif self._drk is not None and self._rdk is None: self._zeta = self.raw_zeta - 2.*self._drk.zeta + self._rrk.zeta else: self._zeta = self.raw_zeta - self._drk.zeta - self._rdk.zeta + self._rrk.zeta
[docs] def write(self, file_name, *, file_type=None, precision=None, write_patch_results=False, write_cov=False): super().write(file_name, file_type=file_type, precision=precision, write_patch_results=write_patch_results, write_cov=write_cov)
write.__doc__ = Corr3.write.__doc__.format( r""" zeta The estimator of :math:`\zeta` (For LogMultipole, this is split into real and imaginary parts, zetar and zetai.) sigma_zeta The sqrt of the variance estimate of :math:`\zeta`. """) @property def _write_class_col_names(self): if self.bin_type == 'LogMultipole': return ['zetar', 'zetai', 'sigma_zeta'] else: return ['zeta', 'sigma_zeta'] @property def _write_class_data(self): if self.bin_type == 'LogMultipole': return [self._z[0], self._z[1], np.sqrt(self.varzeta) ] else: return [self.zeta, np.sqrt(self.varzeta)] def _read_from_data(self, data, params): super()._read_from_data(data, params) s = self.data_shape if self.bin_type == 'LogMultipole': self._z[0] = data['zetar'].reshape(s) self._z[1] = data['zetai'].reshape(s) else: self._z[0] = data['zeta'].reshape(s) self._comp_varzeta = data['sigma_zeta'].reshape(s)**2 self._varzeta = [self._comp_varzeta]