Source code for piff.twod_stats

# Copyright (c) 2016 by Mike Jarvis and the other collaborators on GitHub at
# https://github.com/rmjarvis/Piff  All rights reserved.
#
# Piff is free software: Redistribution and use in source and binary forms
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.

"""
.. module:: twod_stats

"""

import numpy as np
import warnings
import galsim

from .stats import Stats

[docs]class TwoDHistStats(Stats): """Statistics class that can make pretty colormaps where each bin has some arbitrary function applied to it. By default this will make a color map based on u and v coordinates of the input stars. The color scale is based on (by default) the median value of the objects in particular u-v voxel. After a call to :func:`compute`, the following attributes are accessible: :twodhists: A dictionary of two dimensional histograms, with keys 'u', 'v', 'T', 'g1', 'g2', 'T_model', 'g1_model', 'g2_model', 'dT', 'dg1', 'dg2' These histograms are two dimensional masked arrays where the value of the pixel corresponds to reducing_function([objects in u-v voxel]) :param nbins_u: Number of bins in u direction [default: 20] :param nbins_v: Number of bins in v direction [default: 20] :param reducing_function: Type of function to apply to grouped objects. numpy functions are prefixed by np. [default: 'np.median'] :param file_name: Name of the file to output to. [default: None] :param model_properties: Optionally a dict of properties to use for the model rendering. [default: None] :param logger: A logger object for logging debug info. [default: None] """ def __init__(self, nbins_u=20, nbins_v=20, reducing_function='np.median', file_name=None, model_properties=None, logger=None): self.nbins_u = nbins_u self.nbins_v = nbins_v self.reducing_function = eval(reducing_function) self.file_name = file_name self.model_properties = model_properties self.skip = False
[docs] def compute(self, psf, stars, logger=None): """ :param psf: A PSF Object :param stars: A list of Star instances. :param logger: A logger object for logging debug info. [default: None] """ logger = galsim.config.LoggerWrapper(logger) self.twodhists = {} # get the shapes logger.info("Measuring Star and Model Shapes") positions, shapes_truth, shapes_model = self.measureShapes( psf, stars, model_properties=self.model_properties, logger=logger) # Only use stars for which hsm was successful flag_truth = shapes_truth[:, 6] flag_model = shapes_model[:, 6] mask = (flag_truth == 0) & (flag_model == 0) logger.info("%d/%d measurements were successful",np.sum(mask),len(mask)) if np.sum(mask) == 0: logger.warning("All stars had hsm errors. TwoDHist plot will be empty.") self.skip = True return # define terms for the catalogs u = positions[mask, 0] v = positions[mask, 1] T = shapes_truth[mask, 3] g1 = shapes_truth[mask, 4] g2 = shapes_truth[mask, 5] T_model = shapes_model[mask, 3] g1_model = shapes_model[mask, 4] g2_model = shapes_model[mask, 5] dT = T - T_model dg1 = g1 - g1_model dg2 = g2 - g2_model # compute the indices logger.info("Computing TwoDHist indices") self.bins_u = np.linspace(np.min(u), np.max(u), num=self.nbins_u + 1) self.bins_v = np.linspace(np.min(v), np.max(v), num=self.nbins_v + 1) # digitize u and v. No such thing as entries below their min, so -1 to index indx_u = np.digitize(u, self.bins_u) - 1 indx_v = np.digitize(v, self.bins_v) - 1 # Make sure no points go one past the end (due to rounding when u=max(u), etc. np.putmask(indx_u, indx_u >= self.nbins_u, self.nbins_u-1) np.putmask(indx_v, indx_v >= self.nbins_v, self.nbins_v-1) # get unique indices unique_indx = np.vstack([tuple(row) for row in np.vstack((indx_u, indx_v)).T]) # compute the arrays logger.info("Computing TwoDHist arrays") # throw in coordinates for good measure self.twodhists['u'] = self._array_to_2dhist(u, indx_u, indx_v, unique_indx) self.twodhists['v'] = self._array_to_2dhist(v, indx_u, indx_v, unique_indx) # T self.twodhists['T'] = self._array_to_2dhist(T, indx_u, indx_v, unique_indx) # g1 self.twodhists['g1'] = self._array_to_2dhist(g1, indx_u, indx_v, unique_indx) # g2 self.twodhists['g2'] = self._array_to_2dhist(g2, indx_u, indx_v, unique_indx) # T_model self.twodhists['T_model'] = self._array_to_2dhist(T, indx_u, indx_v, unique_indx) # g1_model self.twodhists['g1_model'] = self._array_to_2dhist(g1_model, indx_u, indx_v, unique_indx) # g2_model self.twodhists['g2_model'] = self._array_to_2dhist(g2_model, indx_u, indx_v, unique_indx) # dT self.twodhists['dT'] = self._array_to_2dhist(dT, indx_u, indx_v, unique_indx) # dg1 self.twodhists['dg1'] = self._array_to_2dhist(dg1, indx_u, indx_v, unique_indx) # dg2 self.twodhists['dg2'] = self._array_to_2dhist(dg2, indx_u, indx_v, unique_indx)
[docs] def plot(self, logger=None, **kwargs): r"""Make the plots. :param logger: A logger object for logging debug info. [default: None] :params \**kwargs: Any additional kwargs go into the matplotlib plot() function. [ignored in this function] :returns: fig, ax """ from matplotlib.figure import Figure logger = galsim.config.LoggerWrapper(logger) fig = Figure(figsize=(12,9)) # In matplotlib 2.0, this will be # axs = fig.subplots(ncols=3, nrows=3) axs = [[ fig.add_subplot(3,3,1), fig.add_subplot(3,3,2), fig.add_subplot(3,3,3) ], [ fig.add_subplot(3,3,4), fig.add_subplot(3,3,5), fig.add_subplot(3,3,6) ], [ fig.add_subplot(3,3,7), fig.add_subplot(3,3,8), fig.add_subplot(3,3,9) ]] axs = np.array(axs, dtype=object) # left column gets the Y coordinate label axs[0, 0].set_ylabel('v') axs[1, 0].set_ylabel('v') axs[2, 0].set_ylabel('v') # bottom row gets the X coordinate label axs[2, 0].set_xlabel('u') axs[2, 1].set_xlabel('u') axs[2, 2].set_xlabel('u') axs[0, 0].set_title('T') axs[1, 0].set_title('T Model') axs[2, 0].set_title('dT') axs[0, 1].set_title('g1') axs[1, 1].set_title('g1 Model') axs[2, 1].set_title('dg1') axs[0, 2].set_title('g2') axs[1, 2].set_title('g2 Model') axs[2, 2].set_title('dg2') if self.skip: return fig, axs if not hasattr(self, 'twodhists'): raise RuntimeError("Must call compute before calling plot or write") # make the colormaps logger.info("Creating TwoDHist colormaps") # T and T_model share colorbar vmin__T = np.min([self.twodhists['T'], self.twodhists['T_model']]) vmax__T = np.max([self.twodhists['T'], self.twodhists['T_model']]) vcent__T = np.median([self.twodhists['T'], self.twodhists['T_model']]) cmap__T = self._shift_cmap(vmin__T, vmax__T, center=vcent__T) # g1, g2, g1_model, g2_model share colorbar vmin__g = np.min([self.twodhists['g1'], self.twodhists['g1_model'], self.twodhists['g2'], self.twodhists['g2_model']]) vmax__g = np.max([self.twodhists['g1'], self.twodhists['g1_model'], self.twodhists['g2'], self.twodhists['g2_model']]) cmap__g = self._shift_cmap(vmin__g, vmax__g) # dT gets own colorbar vmin__dT = np.min(self.twodhists['dT']) vmax__dT = np.max(self.twodhists['dT']) cmap__dT = self._shift_cmap(vmin__dT, vmax__dT) # dg1 and dg2 share a colorbar vmin__dg = np.min([self.twodhists['dg1'], self.twodhists['dg2']]) vmax__dg = np.max([self.twodhists['dg1'], self.twodhists['dg2']]) cmap__dg = self._shift_cmap(vmin__dg, vmax__dg) # make the plots logger.info("Creating TwoDHist plots") ax = axs[0, 0] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['T'], cmap=cmap__T, vmin=vmin__T, vmax=vmax__T) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[1, 0] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['T_model'], cmap=cmap__T, vmin=vmin__T, vmax=vmax__T) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[2, 0] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['dT'], cmap=cmap__dT, vmin=vmin__dT, vmax=vmax__dT) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[0, 1] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['g1'], cmap=cmap__g, vmin=vmin__g, vmax=vmax__g) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[1, 1] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['g1_model'], cmap=cmap__g, vmin=vmin__g, vmax=vmax__g) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[2, 1] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['dg1'], cmap=cmap__dg, vmin=vmin__dg, vmax=vmax__dg) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[0, 2] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['g2'], cmap=cmap__g, vmin=vmin__g, vmax=vmax__g) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[1, 2] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['g2_model'], cmap=cmap__g, vmin=vmin__g, vmax=vmax__g) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) ax = axs[2, 2] IM = ax.pcolor(self.bins_u, self.bins_v, self.twodhists['dg2'], cmap=cmap__dg, vmin=vmin__dg, vmax=vmax__dg) ax.set_xlim(min(self.bins_u), max(self.bins_u)) ax.set_ylim(min(self.bins_v), max(self.bins_v)) fig.colorbar(IM, ax=ax) return fig, axs
def _array_to_2dhist(self, z, indx_u, indx_v, unique_indx): C = np.ma.zeros((self.nbins_v, self.nbins_u)) C.mask = np.ones((self.nbins_v, self.nbins_u)) for unique in unique_indx: ui, vi = unique sample = z[(indx_u == ui) & (indx_v == vi)] assert len(sample) > 0 # This is ensured by how we calculate unique_indx value = self.reducing_function(sample) C[vi, ui] = value C.mask[vi, ui] = 0 return C @classmethod def _shift_cmap(self, vmin, vmax, center=0): from matplotlib import cm # want midpoint to be a float! midpoint = (center - vmin) * 1. / (vmax - vmin) # if b <= 0, then we want Blues_r if vmax <= center and vmin <= center: return cm.Blues_r # if a >= 0, then we want Reds elif vmin >= center and vmax >= center: return cm.Reds # catch inverse elif vmin >= center and vmax <= center: return self._shiftedColorMap(cm.RdBu_r, start=1.0, midpoint=1 - midpoint, stop=0.0) else: return self._shiftedColorMap(cm.RdBu_r, midpoint=midpoint) @classmethod def _shiftedColorMap(self, cmap, start=0, midpoint=0.5, stop=1.0, name='shiftedcmap'): ''' Taken from https://github.com/olgabot/prettyplotlib/blob/master/prettyplotlib/colors.py which makes beautiful plots by the way Function to offset the "center" of a colormap. Useful for data with a negative min and positive max and you want the middle of the colormap's dynamic range to be at zero Input ----- cmap : The matplotlib colormap to be altered start : Offset from lowest point in the colormap's range. Defaults to 0.0 (no lower ofset). Should be between 0.0 and `midpoint`. midpoint : The new center of the colormap. Defaults to 0.5 (no shift). Should be between 0.0 and 1.0. In general, this should be 1 - vmax/(vmax + abs(vmin)) For example if your data range from -15.0 to +5.0 and you want the center of the colormap at 0.0, `midpoint` should be set to 1 - 5/(5 + 15)) or 0.75 stop : Offset from highets point in the colormap's range. Defaults to 1.0 (no upper ofset). Should be between `midpoint` and 1.0. ''' from matplotlib.colors import LinearSegmentedColormap cdict = { 'red': [], 'green': [], 'blue': [], 'alpha': [] } # regular index to compute the colors reg_index = np.linspace(start, stop, 257) # shifted index to match the data shift_index = np.hstack([ np.linspace(0.0, midpoint, 128, endpoint=False), np.linspace(midpoint, 1.0, 129, endpoint=True) ]) for ri, si in zip(reg_index, shift_index): r, g, b, a = cmap(ri) cdict['red'].append((si, r, r)) cdict['green'].append((si, g, g)) cdict['blue'].append((si, b, b)) cdict['alpha'].append((si, a, a)) newcmap = LinearSegmentedColormap(name, cdict) # add some overunders newcmap.set_bad(color='g', alpha=0.75) newcmap.set_over(color='m', alpha=0.75) newcmap.set_under(color='c', alpha=0.75) return newcmap
[docs]class WhiskerStats(Stats): """Statistics class that can make whiskerplots. By default this will make a whisker plot based on u and v coordinates of the input stars. The whisker scale is based on (by default) the median value of the objects in a particular u-v voxel. After a call to :func:`compute`, the following attributes are accessible: :twodhists: A dictionary of two dimensional histograms, with keys 'u', 'v', 'w1', 'w2', 'w1_model', 'w2_model', 'dw1', 'dw2' These histograms are two dimensional masked arrays where the value of the pixel corresponds to reducing_function([objects in u-v voxel]) .. note:: There are a couple different ways to define your whiskers. Here we have taken the approach that the whisker represents the ellipticity as: theta = arctan(e2, e1) / 2 r = sqrt(e1 ** 2 + e2 ** 2) w1 = r cos(theta) w2 = r sin(theta) Because e1, e2 do not have units, w does not either. :param file_name: Name of the file to output to. [default: None] :param nbins_u: Number of bins in u direction [default: 20] :param nbins_v: Number of bins in v direction [default: 20] :param reducing_function: Type of function to apply to grouped objects. numpy functions are prefixed by np. [default: 'np.median'] :param scale: An overal scale factor by which to scale the size of the all whiskers. [default: 1] :param resid_scale: An additional factor for the scale size of the residual whiskers only. [default: 2] :param model_properties: Optionally a dict of properties to use for the model rendering. [default: None] :param logger: A logger object for logging debug info. [default: None] """ def __init__(self, file_name=None, nbins_u=20, nbins_v=20, reducing_function='np.median', scale=1, resid_scale=2, model_properties=None, logger=None): self.file_name = file_name self.nbins_u = nbins_u self.nbins_v = nbins_v self.reducing_function = eval(reducing_function) self.scale = scale self.resid_scale = resid_scale self.model_properties = model_properties self.skip = False
[docs] def compute(self, psf, stars, logger=None): """ :param psf: A PSF Object :param stars: A list of Star instances. :param logger: A logger object for logging debug info. [default: None] """ logger = galsim.config.LoggerWrapper(logger) self.twodhists = {} # get the shapes logger.info("Measuring Star and Model Shapes") positions, shapes_truth, shapes_model = self.measureShapes( psf, stars, model_properties=self.model_properties, logger=logger) # Only use stars for which hsm was successful flag_truth = shapes_truth[:, 6] flag_model = shapes_model[:, 6] mask = (flag_truth == 0) & (flag_model == 0) logger.info("%d/%d measurements were successful",np.sum(mask),len(mask)) if np.sum(mask) == 0: logger.warning("All stars had hsm errors. Whisker plot will be empty.") self.skip = True return # define terms for the catalogs u = positions[mask, 0] v = positions[mask, 1] T = shapes_truth[mask, 3] g1 = shapes_truth[mask, 4] g2 = shapes_truth[mask, 5] T_model = shapes_model[mask, 3] g1_model = shapes_model[mask, 4] g2_model = shapes_model[mask, 5] dT = T - T_model dg1 = g1 - g1_model dg2 = g2 - g2_model mag_w = np.sqrt(np.square(g1) + np.square(g2)) phi = np.arctan2(g2, g1) / 2. w1 = mag_w * np.cos(phi) w2 = mag_w * np.sin(phi) mag_w_model = np.sqrt(np.square(g1_model) + np.square(g2_model)) phi_model = np.arctan2(g2_model, g1_model) / 2. w1_model = mag_w_model * np.cos(phi_model) w2_model = mag_w_model * np.sin(phi_model) dmag_w = np.sqrt(np.square(dg1) + np.square(dg2)) dphi = np.arctan2(dg2, dg1) / 2. dw1 = dmag_w * np.cos(dphi) dw2 = dmag_w * np.sin(dphi) # compute the indices logger.info("Computing Whisker indices") self.bins_u = np.linspace(np.min(u), np.max(u), num=self.nbins_u + 1) self.bins_v = np.linspace(np.min(v), np.max(v), num=self.nbins_v + 1) # digitize u and v. No such thing as entries below their min, so -1 to index indx_u = np.digitize(u, self.bins_u) - 1 indx_v = np.digitize(v, self.bins_v) - 1 # Make sure no points go one past the end (due to rounding when u=max(u), etc. np.putmask(indx_u, indx_u >= self.nbins_u, self.nbins_u-1) np.putmask(indx_v, indx_v >= self.nbins_v, self.nbins_v-1) # get unique indices unique_indx = np.vstack([tuple(row) for row in np.vstack((indx_u, indx_v)).T]) # compute the arrays logger.info("Computing Whisker arrays") self.twodhists['u'] = self._array_to_2dhist(u, indx_u, indx_v, unique_indx) self.twodhists['v'] = self._array_to_2dhist(v, indx_u, indx_v, unique_indx) # w1 self.twodhists['w1'] = self._array_to_2dhist(w1, indx_u, indx_v, unique_indx) # w2 self.twodhists['w2'] = self._array_to_2dhist(w2, indx_u, indx_v, unique_indx) # w1_model self.twodhists['w1_model'] = self._array_to_2dhist(w1_model, indx_u, indx_v, unique_indx) # w2_model self.twodhists['w2_model'] = self._array_to_2dhist(w2_model, indx_u, indx_v, unique_indx) # dw1 self.twodhists['dw1'] = self._array_to_2dhist(dw1, indx_u, indx_v, unique_indx) # dw2 self.twodhists['dw2'] = self._array_to_2dhist(dw2, indx_u, indx_v, unique_indx)
[docs] def plot(self, logger=None, **kwargs): r"""Make the plots. :param logger: A logger object for logging debug info. [default: None] :params \**kwargs: Any additional kwargs go into the matplotlib plot() function. [ignored in this function] :returns: fig, ax """ from matplotlib.figure import Figure logger = galsim.config.LoggerWrapper(logger) fig = Figure(figsize=(7.5,4)) ax0, ax1 = fig.subplots(1, 2, sharey=True, subplot_kw={'aspect' : 'equal'}) ax0.axis('off') ax0.set_title('Raw PSF') #ax0.set_xlabel('u') #ax0.set_ylabel('v') ax1.axis('off') ax1.set_title('PSF Residuals') #ax1.set_xlabel('u') #ax1.set_ylabel('v') if self.skip: return fig, [ax0, ax1] if not hasattr(self, 'twodhists'): raise RuntimeError("Must call compute before calling plot or write") # make the plots logger.info("Creating Whisker plots") # configure to taste # bigger scale = smaller whiskers quiver_dict = dict(headlength=0, headwidth=0, headaxislength=0, minlength=0, pivot='middle', scale_units='xy', width=0.001, color='blue', ) # raw whiskers u = self.twodhists['u'] v = self.twodhists['v'] w1 = self.twodhists['w1'] w2 = self.twodhists['w2'] dw1 = self.twodhists['dw1'] dw2 = self.twodhists['dw2'] use = u.mask == 0 # Note: the quiver "scale" is such that larger value = smaller whiskers. # This is backwards of how I think of scale, which is why the / here, not *. scale = 1.e-4 / self.scale qv = ax0.quiver(u[use], v[use], w1[use], w2[use], scale=scale, **quiver_dict) # quiverkey ref = 0.03 ref_label = 'e = %s'%ref ax0.quiverkey(qv, 0.15, 0.00, ref, ref_label, coordinates='axes', color='darkred', labelcolor='darkred', labelpos='S') # residual whiskers scale = 1.e-4 / self.scale / self.resid_scale qv = ax1.quiver(u[use], v[use], dw1[use], dw2[use], scale=scale, **quiver_dict) # quiverkey ref = 0.03 / self.resid_scale ref_label = 'e = %s'%ref ax1.quiverkey(qv, 0.85, 0.00, ref, ref_label, coordinates='axes', color='darkred', labelcolor='darkred', labelpos='S') return fig, [ax0, ax1]
def _array_to_2dhist(self, z, indx_u, indx_v, unique_indx): C = np.ma.zeros((self.nbins_v, self.nbins_u)) C.mask = np.ones((self.nbins_v, self.nbins_u)) for unique in unique_indx: ui, vi = unique sample = z[(indx_u == ui) & (indx_v == vi)] assert len(sample) > 0 value = self.reducing_function(sample) C[vi, ui] = value C.mask[vi, ui] = 0 return C