Source code for piff.psf

# Copyright (c) 2016 by Mike Jarvis and the other collaborators on GitHub at
# https://github.com/rmjarvis/Piff  All rights reserved.
#
# Piff is free software: Redistribution and use in source and binary forms
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.

"""
.. module:: psf
"""

import numpy as np
import fitsio
import galsim
import sys

from .star import Star, StarData
from .util import write_kwargs, read_kwargs

[docs]class PSF(object): """The base class for describing a PSF model across a field of view. The usual way to create a PSF is through one of the two factory functions:: >>> psf = piff.PSF.process(config, logger) >>> psf = piff.PSF.read(file_name, logger) The first is used to build a PSF model from the data according to a config dict. The second is used to read in a PSF model from disk. """
[docs] @classmethod def process(cls, config_psf, logger=None): """Process the config dict and return a PSF instance. As the PSF class is an abstract base class, the returned type will in fact be some subclass of PSF according to the contents of the config dict. The provided config dict is typically the 'psf' field in the base config dict in a YAML file, although for compound PSF types, it may be the field for just one of several components. This function merely creates a "blank" PSF object. It does not actually do any part of the solution yet. Typically this will be followed by fit: >>> psf = piff.PSF.process(config['psf']) >>> stars, wcs, pointing = piff.Input.process(config['input']) >>> psf.fit(stars, wcs, pointing) at which point, the ``psf`` instance would have a solution to the PSF model. :param config_psf: A dict specifying what type of PSF to build along with the appropriate kwargs for building it. :param logger: A logger object for logging debug info. [default: None] :returns: a PSF instance of the appropriate type. """ import piff import yaml logger = galsim.config.LoggerWrapper(logger) logger.debug("Parsing PSF based on config dict:") logger.debug(yaml.dump(config_psf, default_flow_style=False)) # Get the class to use for the PSF psf_type = config_psf.get('type', 'Simple') + 'PSF' logger.debug("PSF type is %s",psf_type) cls = getattr(piff, psf_type) # Read any other kwargs in the psf field kwargs = cls.parseKwargs(config_psf, logger) # Build PSF object logger.info("Building %s",psf_type) psf = cls(**kwargs) logger.debug("Done building PSF") return psf
[docs] @classmethod def parseKwargs(cls, config_psf, logger=None): """Parse the psf field of a configuration dict and return the kwargs to use for initializing an instance of the class. The base class implementation just returns the kwargs as they are, but derived classes might want to override this if they need to do something more sophisticated with them. :param config_psf: The psf field of the configuration dict, config['psf'] :param logger: A logger object for logging debug info. [default: None] :returns: a kwargs dict to pass to the initializer """ raise NotImplementedError("Derived classes must define the parseKwargs function")
[docs] def draw(self, x, y, chipnum=None, flux=1.0, center=None, offset=None, stamp_size=48, image=None, logger=None, **kwargs): r"""Draws an image of the PSF at a given location. The normal usage would be to specify (chipnum, x, y), in which case Piff will use the stored wcs information for that chip to interpolate to the given position and draw an image of the PSF: >>> image = psf.draw(chipnum=4, x=103.3, y=592.0, stamp_size=48) However, if the PSF interpolation used extra properties for the interpolation (cf. psf.interp_property_names), you need to provide them as additional kwargs. >>> print(psf.interp_property_names) ('u','v','ri_color') >>> image = psf.draw(chipnum=4, x=103.3, y=592.0, ri_color=0.23, stamp_size=48) Normally, the image is constructed automatically based on stamp_size, in which case the WCS will be taken to be the local Jacobian at this location on the original image. However, if you provide your own image using the :image: argument, then whatever WCS is present in that image will be respected. E.g. if you want an image of the PSF in sky coordinates rather than image coordinates, you can provide an image with just a pixel scale for the WCS. When drawing the PSF, there are a few options regarding how the profile will be centered on the image. 1. The default behavior (``center==None``) is to draw the profile centered at the same (x,y) as you requested for the location of the PSF in the original image coordinates. The returned image will not (normally) be as large as the full image -- it will just be a postage stamp centered roughly around (x,y). The image.bounds give the bounding box of this stamp, and within this, the PSF will be centered at position (x,y). 2. If you want to center the profile at some other arbitrary position, you may provide a ``center`` parameter, which should be a tuple (xc,yc) giving the location at which you want the PSF to be centered. The bounding box will still be around the nominal (x,y) position, so this should only be used for small adjustments to the (x,y) value if you want it centered at a slightly different location. 3. If you provide your own image with the ``image`` parameter, then you may set the ``center`` to any location in this box (or technically off it -- it doesn't check that the center is actually inside the bounding box). This may be useful if you want to draw on an image with origin at (0,0) or (1,1) and just put the PSF at the location you want. 4. If you want the PSf centered exactly in the center of the image, then you can use ``center=True``. This will work for either an automatically built image or one that you provide. 5. With any of the above options you may additionally supply an ``offset`` parameter, which will apply a slight offset to the calculated center. This is probably only useful in conjunction with the default ``center=None`` or ``center=True``. :param x: The x position of the desired PSF in the original image coordinates. :param y: The y position of the desired PSF in the original image coordinates. :param chipnum: Which chip to use for WCS information. [required if the psf model covers more than a single chip] :param flux: Flux of PSF to be drawn [default: 1.0] :param center: (xc,yc) tuple giving the location on the image where you want the nominal center of the profile to be drawn. Also allowed is the value center=True to place in the center of the image. [default: None, which means draw at the position (x,y) of the PSF.] :param offset: Optional (dx,dy) tuple giving an additional offset relative to the center. [default: None] :param stamp_size: The size of the image to construct if no image is provided. [default: 48] :param image: An existing image on which to draw, if desired. [default: None] :param logger: A logger object for logging debug info. [default: None] :param \**kwargs: Any additional properties required for the interpolation. :returns: A GalSim Image of the PSF """ logger = galsim.config.LoggerWrapper(logger) chipnum = self._check_chipnum(chipnum) prof, method = self.get_profile(x,y,chipnum=chipnum, flux=flux, logger=logger, **kwargs) logger.debug("Drawing star at (%s,%s) on chip %s", x, y, chipnum) # Make the image if necessary if image is None: image = galsim.Image(stamp_size, stamp_size, dtype=float) # Make the center of the image (close to) the image_pos xcen = int(np.ceil(x - (0.5 if image.array.shape[1] % 2 == 1 else 0))) ycen = int(np.ceil(y - (0.5 if image.array.shape[0] % 2 == 1 else 0))) image.setCenter(xcen, ycen) # If no wcs is given, use the original wcs if image.wcs is None: image.wcs = self.wcs[chipnum] # Handle the input center if center is None: center = (x, y) elif center is True: center = image.true_center center = (center.x, center.y) elif not isinstance(center, tuple): raise ValueError("Invalid center parameter: %r. Must be tuple or None or True"%( center)) # Handle offset if given if offset is not None: center = (center[0] + offset[0], center[1] + offset[1]) prof.drawImage(image, method=method, center=center) return image
[docs] def get_profile(self, x, y, chipnum=None, flux=1.0, logger=None, **kwargs): r"""Get the PSF profile at the given position as a GalSim GSObject. The normal usage would be to specify (chipnum, x, y), in which case Piff will use the stored wcs information for that chip to interpolate to the given position and draw an image of the PSF: >>> prof, method = psf.get_profile(chipnum=4, x=103.3, y=592.0) The first return value, prof, is the GSObject describing the PSF profile. The second one, method, is the method parameter that should be used when drawing the profile using ``prof.drawImage(..., method=method)``. This may be either 'no_pixel' or 'auto' depending on whether the PSF model already includes the pixel response or not. Some underlying models includ the pixel response, and some don't, so this difference needs to be accounted for properly when drawing. This method is also appropriate if you first convolve the PSF by some other (e.g. galaxy) profile and then draw that. If the PSF interpolation used extra properties for the interpolation (cf. psf.extra_interp_properties), you need to provide them as additional kwargs. >>> print(psf.extra_interp_properties) ('ri_color',) >>> prof, method = psf.get_profile(chipnum=4, x=103.3, y=592.0, ri_color=0.23) :param x: The x position of the desired PSF in the original image coordinates. :param y: The y position of the desired PSF in the original image coordinates. :param chipnum: Which chip to use for WCS information. [required if the psf model covers more than a single chip] :param flux: Flux of PSF model [default: 1.0] :param \**kwargs: Any additional properties required for the interpolation. :returns: (profile, method) profile = A GalSim GSObject of the PSF method = either 'no_pixel' or 'auto' indicating which method to use when drawing the profile on an image. """ logger = galsim.config.LoggerWrapper(logger) chipnum = self._check_chipnum(chipnum) properties = {'chipnum' : chipnum} for key in self.interp_property_names: if key in ['x','y','u','v']: continue if key not in kwargs: raise TypeError("Extra interpolation property %r is required"%key) properties[key] = kwargs.pop(key) if len(kwargs) != 0: raise TypeError("Unexpected keyword argument(s) %r"%list(kwargs.keys())[0]) image_pos = galsim.PositionD(x,y) wcs = self.wcs[chipnum] field_pos = StarData.calculateFieldPos(image_pos, wcs, self.pointing, properties) u,v = field_pos.x, field_pos.y star = Star.makeTarget(x=x, y=y, u=u, v=v, wcs=wcs, properties=properties, pointing=self.pointing) logger.debug("Getting PSF profile at (%s,%s) on chip %s", x, y, chipnum) # Interpolate and adjust the flux of the star. star = self.interpolateStar(star).withFlux(flux) # The last step is implementd in the derived classes. prof, method = self._getProfile(star) return prof, method
def _check_chipnum(self, chipnum): chipnums = list(self.wcs.keys()) if chipnum is None: if len(chipnums) == 1: chipnum = chipnums[0] else: raise ValueError("chipnum is required. Must be one of %s", str(chipnums)) elif chipnum not in chipnums: raise ValueError("Invalid chipnum. Must be one of %s", str(chipnums)) return chipnum
[docs] def interpolateStarList(self, stars): """Update the stars to have the current interpolated fit parameters according to the current PSF model. :param stars: List of Star instances to update. :returns: List of Star instances with their fit parameters updated. """ return [self.interpolateStar(star) for star in stars]
[docs] def interpolateStar(self, star): """Update the star to have the current interpolated fit parameters according to the current PSF model. :param star: Star instance to update. :returns: Star instance with its fit parameters updated. """ raise NotImplementedError("Derived classes must define the interpolateStar function")
[docs] def drawStarList(self, stars, copy_image=True): """Generate PSF images for given stars. Takes advantage of interpolateList for significant speedup with some interpolators. .. note:: If the stars already have the fit parameters calculated, then this will trust those values and not redo the interpolation. If this might be a concern, you can force the interpolation to be redone by running >>> stars = psf.interpolateList(stars) before running `drawStarList`. :param stars: List of Star instances holding information needed for interpolation as well as an image/WCS into which PSF will be rendered. :param copy_image: If False, will use the same image object. If True, will copy the image and then overwrite it. [default: True] :returns: List of Star instances with its image filled with rendered PSF """ if any(star.fit is None or star.fit.params is None for star in stars): stars = self.interpolateStarList(stars) return [self._drawStar(star, copy_image=copy_image) for star in stars]
[docs] def drawStar(self, star, copy_image=True, center=None): """Generate PSF image for a given star. .. note:: If the star already has the fit parameters calculated, then this will trust those values and not redo the interpolation. If this might be a concern, you can force the interpolation to be redone by running >>> star = psf.interpolateList(star) before running `drawStar`. :param star: Star instance holding information needed for interpolation as well as an image/WCS into which PSF will be rendered. :param copy_image: If False, will use the same image object. If True, will copy the image and then overwrite it. [default: True] :param center: An optional tuple (x,y) location for where to center the drawn profile in the image. [default: None, which draws at the star's location.] :returns: Star instance with its image filled with rendered PSF """ # Interpolate parameters to this position/properties (if not already done): if star.fit is None or star.fit.params is None: star = self.interpolateStar(star) # Render the image return self._drawStar(star, copy_image=copy_image, center=center)
def _drawStar(self, star, copy_image=True, center=None): # Derived classes may choose to override any of the above functions # But they have to at least override this one and interpolateStar to implement # their actual PSF model. raise NotImplementedError("Derived classes must define the _drawStar function") def _getProfile(self, star): raise NotImplementedError("Derived classes must define the _getProfile function")
[docs] def write(self, file_name, logger=None): """Write a PSF object to a file. :param file_name: The name of the file to write to. :param logger: A logger object for logging debug info. [default: None] """ logger = galsim.config.LoggerWrapper(logger) logger.warning("Writing PSF to file %s",file_name) with fitsio.FITS(file_name,'rw',clobber=True) as f: self._write(f, 'psf', logger)
def _write(self, fits, extname, logger=None): """This is the function that actually does the work for the write function. Composite PSF classes that need to iterate can call this multiple times as needed. :param fits: An open fitsio.FITS object :param extname: The name of the extension with the psf information. :param logger: A logger object for logging debug info. """ from . import __version__ as piff_version if len(fits) == 1: header = {'piff_version': piff_version} fits.write(data=None, header=header) psf_type = self.__class__.__name__ write_kwargs(fits, extname, dict(self.kwargs, type=psf_type, piff_version=piff_version)) logger.info("Wrote the basic PSF information to extname %s", extname) Star.write(self.stars, fits, extname=extname + '_stars') logger.info("Wrote the PSF stars to extname %s", extname + '_stars') self.writeWCS(fits, extname=extname + '_wcs', logger=logger) logger.info("Wrote the PSF WCS to extname %s", extname + '_wcs') self._finish_write(fits, extname=extname, logger=logger)
[docs] @classmethod def read(cls, file_name, logger=None): """Read a PSF object from a file. :param file_name: The name of the file to read. :param logger: A logger object for logging debug info. [default: None] :returns: a PSF instance """ logger = galsim.config.LoggerWrapper(logger) logger.warning("Reading PSF from file %s",file_name) with fitsio.FITS(file_name,'r') as f: logger.debug('opened FITS file') return cls._read(f, 'psf', logger)
@classmethod def _read(cls, fits, extname, logger): """This is the function that actually does the work for the read function. Composite PSF classes that need to iterate can call this multiple times as needed. :param fits: An open fitsio.FITS object :param extname: The name of the extension with the psf information. :param logger: A logger object for logging debug info. """ import piff # Read the type and kwargs from the base extension assert extname in fits assert 'type' in fits[extname].get_colnames() kwargs = read_kwargs(fits, extname) psf_type = kwargs.pop('type') # If piff_version is not in the file, then it was written prior to version 1.3. # Since we don't know what version it was, we just use None. piff_version = kwargs.pop('piff_version',None) # Check that this is a valid PSF type psf_classes = piff.util.get_all_subclasses(piff.PSF) valid_psf_types = dict([ (c.__name__, c) for c in psf_classes ]) if psf_type not in valid_psf_types: raise ValueError("psf type %s is not a valid Piff PSF"%psf_type) psf_cls = valid_psf_types[psf_type] # Read the stars, wcs, pointing values stars = Star.read(fits, extname + '_stars') logger.debug("stars = %s",stars) wcs, pointing = cls.readWCS(fits, extname + '_wcs', logger=logger) logger.debug("wcs = %s, pointing = %s",wcs,pointing) # Make the PSF instance psf = psf_cls(**kwargs) psf.stars = stars psf.wcs = wcs psf.pointing = pointing # Just in case the class needs to do something else at the end. psf._finish_read(fits, extname, logger) # Save the piff version as an attibute. psf.piff_version = piff_version return psf
[docs] def writeWCS(self, fits, extname, logger): """Write the WCS information to a FITS file. :param fits: An open fitsio.FITS object :param extname: The name of the extension to write to :param logger: A logger object for logging debug info. """ import base64 try: import cPickle as pickle except ImportError: import pickle logger = galsim.config.LoggerWrapper(logger) # Start with the chipnums chipnums = list(self.wcs.keys()) cols = [ chipnums ] dtypes = [ ('chipnums', int) ] # GalSim WCS objects can be serialized via pickle wcs_str = [ base64.b64encode(pickle.dumps(w)) for w in self.wcs.values() ] max_len = np.max([ len(s) for s in wcs_str ]) # Some GalSim WCS serializations are rather long. In particular, the Pixmappy one # is longer than the maximum length allowed for a column in a fits table (28799). # So split it into chunks of size 2**14 (mildly less than this maximum). chunk_size = 2**14 nchunks = max_len // chunk_size + 1 cols.append( [nchunks]*len(chipnums) ) dtypes.append( ('nchunks', int) ) # Update to size of chunk we actually need. chunk_size = (max_len + nchunks - 1) // nchunks chunks = [ [ s[i:i+chunk_size] for i in range(0, max_len, chunk_size) ] for s in wcs_str ] cols.extend(zip(*chunks)) dtypes.extend( ('wcs_str_%04d'%i, bytes, chunk_size) for i in range(nchunks) ) if self.pointing is not None: # Currently, there is only one pointing for all the chips, but write it out # for each row anyway. dtypes.extend( (('ra', float), ('dec', float)) ) ra = [self.pointing.ra / galsim.hours] * len(chipnums) dec = [self.pointing.dec / galsim.degrees] * len(chipnums) cols.extend( (ra, dec) ) data = np.array(list(zip(*cols)), dtype=dtypes) fits.write_table(data, extname=extname)
[docs] @classmethod def readWCS(cls, fits, extname, logger): """Read the WCS information from a FITS file. :param fits: An open fitsio.FITS object :param extname: The name of the extension to read from :param logger: A logger object for logging debug info. :returns: wcs, pointing where wcs is a dict of galsim.BaseWCS instances and pointing is a galsim.CelestialCoord instance """ import base64 try: import cPickle as pickle except ImportError: import pickle assert extname in fits assert 'chipnums' in fits[extname].get_colnames() assert 'nchunks' in fits[extname].get_colnames() data = fits[extname].read() chipnums = data['chipnums'] nchunks = data['nchunks'] nchunks = nchunks[0] # These are all equal, so just take first one. wcs_keys = [ 'wcs_str_%04d'%i for i in range(nchunks) ] wcs_str = [ data[key] for key in wcs_keys ] # Get all wcs_str columns try: wcs_str = [ b''.join(s) for s in zip(*wcs_str) ] # Rejoint into single string each except TypeError: # pragma: no cover # fitsio 1.0 returns strings wcs_str = [ ''.join(s) for s in zip(*wcs_str) ] # Rejoint into single string each wcs_str = [ base64.b64decode(s) for s in wcs_str ] # Convert back from b64 encoding # Convert back into wcs objects try: wcs_list = [ pickle.loads(s, encoding='bytes') for s in wcs_str ] except Exception: # If the file was written by py2, the bytes encoding might raise here, # or it might not until we try to use it. wcs_list = [ pickle.loads(s, encoding='latin1') for s in wcs_str ] wcs = dict(zip(chipnums, wcs_list)) try: # If this doesn't work, then the file was probably written by py2, not py3 repr(wcs) except Exception: logger.info('Failed to decode wcs with bytes encoding.') logger.info('Retry with encoding="latin1" in case file written with python 2.') wcs_list = [ pickle.loads(s, encoding='latin1') for s in wcs_str ] wcs = dict(zip(chipnums, wcs_list)) repr(wcs) # Work-around for a change in the GalSim API with 2.0 # If the piff file was written with pre-2.0 GalSim, this fixes it. for key in wcs: w = wcs[key] if hasattr(w, '_origin') and isinstance(w._origin, galsim._galsim.PositionD): w._origin = galsim.PositionD(w._origin) if 'ra' in fits[extname].get_colnames(): ra = data['ra'] dec = data['dec'] pointing = galsim.CelestialCoord(ra[0] * galsim.hours, dec[0] * galsim.degrees) else: pointing = None return wcs, pointing
# Make a global function, piff.read, as an alias for piff.PSF.read, since that's the main thing # users will want to do as their starting point for using a piff file. def read(file_name, logger=None): """Read a Piff PSF object from a file. .. note:: The returned PSF instance will have an attribute piff_version, which indicates the version of Piff that was used to create the file. (If it was written with Piff version >= 1.3.0.) :param file_name: The name of the file to read. :param logger: A logger object for logging debug info. [default: None] :returns: a piff.PSF instance """ return PSF.read(file_name, logger=logger)