Source code for piff.config

# Copyright (c) 2016 by Mike Jarvis and the other collaborators on GitHub at
# https://github.com/rmjarvis/Piff  All rights reserved.
#
# Piff is free software: Redistribution and use in source and binary forms
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.

"""
.. module:: config
"""

import yaml
import os
import galsim

[docs]def setup_logger(verbose=1, log_file=None): """Build a logger object to use for logging progress Note: This will update the verbosity if a previous call to setup_logger used a different value for verbose. However, it will not update the handler to use a different log_file or switch between using a log_file and stdout. :param verbose: A number from 0-3 giving the level of verbosity to use. [default: 1] :param log_file: A file name to which to output the logging information. [default: None] :returns: a logging.Logger instance """ import logging # Parse the integer verbosity level from the command line args into a logging_level string logging_levels = { 0: logging.CRITICAL, 1: logging.WARNING, 2: logging.INFO, 3: logging.DEBUG } logging_level = logging_levels[verbose] # Setup logging to go to sys.stdout or (if requested) to an output file logger = logging.getLogger('piff') logger.handlers = [] # Remove any existing handlers if log_file is None: handle = logging.StreamHandler() else: handle = logging.FileHandler(log_file) formatter = logging.Formatter('%(message)s') # Simple text output handle.setFormatter(formatter) logger.addHandler(handle) logger.setLevel(logging_level) return logger
[docs]def parse_variables(config, variables, logger): """Parse configuration variables and add them to the config dict The command line variables should be specified as key=value. The key string can include dots, such as interp.order=2, which means to set:: config['interp']['order'] = 2 :param config: The configuration dict to which to write the key,value pairs. :param varaibles: A list of (typically command line) variables to parse. :param logger: A logger object for logging debug info. """ # Note: This is basically a copy of the GalSim function ParseVariables in the galsim.py script. new_params = {} for v in variables: logger.debug('Parsing additional variable: %s',v) if '=' not in v: raise ValueError('Improper variable specificationi: %s. Use field.item=value.'%v) key, value = v.split('=',1) try: # Use YAML parser to evaluate the string in case it is a list for instance. value = yaml.safe_load(value) except yaml.YAMLError as e: # pragma: no cover logger.warning('Caught %r',e) logger.warning('Unable to parse %s. Treating it as a string.',value) new_params[key] = value galsim.config.UpdateConfig(config, new_params)
[docs]def read_config(file_name): """Read a configuration dict from a file. :param file_name: The file name from which the configuration dict should be read. """ with open(file_name) as fin: config = yaml.safe_load(fin.read()) return config
def process(config, logger=None): """Build a Piff model according to the specifications in a config dict. Note: This is almost the same as the piffify function/executable. The difference is that it returns the resulting psf, rather than writing it to a file. :param config: The configuration file that defines how to build the model :param logger: A logger object for logging progress. [default: None] :returns: the psf model """ from .input import Input from .select import Select from .psf import PSF if logger is None: verbose = config.get('verbose', 1) logger = setup_logger(verbose=verbose) for key in ['input', 'psf']: if key not in config: raise ValueError("%s field is required in config dict"%key) # Import extra modules if requested if 'modules' in config: galsim.config.ImportModules(config) # We used to allow a bunch of items in config['input'], which now belong in config['select'] # For the 1.x series, allow the old API, but give a warning. select_keys = set(Select.base_keys) user_input_keys = set(config['input'].keys()) depr_keys = select_keys & user_input_keys if len(depr_keys) > 0: logger.error("WARNING: Items %r should now be in the 'select' field of the config file.", sorted(depr_keys)) if 'select' not in config: config['select'] = {} for key in depr_keys: config['select'][key] = config['input'].pop(key) # read in the input images objects, wcs, pointing = Input.process(config['input'], logger=logger) stars = Select.process(config.get('select',{}), objects, logger=logger) psf = PSF.process(config['psf'], logger=logger) psf.fit(stars, wcs, pointing, logger=logger) # Attach these for reference psf.initial_objects = objects psf.initial_stars = stars return psf
[docs]def piffify(config, logger=None): """Build a Piff model according to the specifications in a config dict. This includes writing the model to disk according to the output field. If you would rather get the psf object in return, see the process function. :param config: The configuration file that defines how to build the model :param logger: A logger object for logging progress. [default: None] """ from .output import Output for key in ['input', 'output', 'psf']: if key not in config: raise ValueError("%s field is required in config dict"%key) if logger is None: verbose = config.get('verbose', 1) logger = setup_logger(verbose=verbose) psf = process(config, logger) # write it out to a file output = Output.process(config['output'], logger=logger) output.write(psf, logger=logger)
def plotify(config, logger=None): """Take a Piff model, load in images, and execute output. :param config: The configuration file that defines how to build the model :param logger: A logger object for logging progress. [default: None] """ import piff from .psf import PSF from .output import Output from .star import Star if logger is None: verbose = config.get('verbose', 1) logger = setup_logger(verbose=verbose) for key in ['input', 'output']: if key not in config: raise ValueError("%s field is required in config dict"%key) for key in ['file_name']: if key not in config['output']: raise ValueError("%s field is required in config dict output"%key) # Import extra modules if requested if 'modules' in config: galsim.config.ImportModules(config) # load psf by looking at output file file_name = config['output']['file_name'] if 'dir' in config['output']: file_name = os.path.join(config['output']['dir'], file_name) logger.info("Looking for PSF at %s", file_name) psf = PSF.read(file_name, logger=logger) # The stars we care about for plotify are psf.stars, not what would be made from scratch by # processing the input and select fields. For one, there may be a random component to that # process, which would be different this time. But also, some input objects might have been # removed as outliers, so we don't want to include them here. # However, the psf.stars do not have the images loaded (to save space in the output file # so it's not enormous). So we need to load in the images for the stats. input_handler_class = getattr(piff, 'Input' + config['input'].get('type','Files')) input_handler = input_handler_class(config['input'], logger) stars = input_handler.load_images(psf.stars, logger=logger) # We don't want to rewrite the PSF to disk, so jump straight to the stats_list output = Output.process(config['output'], logger=logger) logger.debug("stats_list = %s",output.stats_list) for stats in output.stats_list: stats.compute(psf,stars,logger=logger) stats.write(logger=logger) def meanify(config, logger=None): """Take Piff output(s), build an average of the FoV, and write output average. :param config: The configuration file that defines how to build the model :param logger: A logger object for logging progress. [default: None] """ from .star import Star import glob import numpy as np from scipy.stats import binned_statistic_2d import fitsio if logger is None: verbose = config.get('verbose', 1) logger = setup_logger(verbose=verbose) for key in ['output', 'hyper']: if key not in config: raise ValueError("%s field is required in config dict"%key) for key in ['file_name']: if key not in config['output']: raise ValueError("%s field is required in config dict output"%key) for key in ['file_name']: if key not in config['hyper']: raise ValueError("%s field is required in config dict hyper"%key) if 'dir' in config['output']: dir = config['output']['dir'] else: dir = None if 'bin_spacing' in config['hyper']: bin_spacing = config['hyper']['bin_spacing'] #in arcsec else: bin_spacing = 120. #default bin_spacing: 120 arcsec if 'statistic' in config['hyper']: if config['hyper']['statistic'] not in ['mean', 'median']: raise ValueError("%s is not a suported statistic (only mean and median are currently " "suported)"%config['hyper']['statistic']) else: stat_used = config['hyper']['statistic'] else: stat_used = 'mean' #default statistics: arithmetic mean over each bin if 'params_fitted' in config['hyper']: if type(config['hyper']['params_fitted']) != list: raise TypeError('must give a list of index for params_fitted') else: params_fitted = config['hyper']['params_fitted'] else: params_fitted = None if isinstance(config['output']['file_name'], list): psf_list = config['output']['file_name'] if len(psf_list) == 0: raise ValueError("file_name may not be an empty list") elif isinstance(config['output']['file_name'], str): file_name = config['output']['file_name'] if dir is not None: file_name = os.path.join(dir, file_name) psf_list = sorted(glob.glob(file_name)) if len(psf_list) == 0: raise ValueError("No files found corresponding to "+config['output']['file_name']) else: raise ValueError("file_name should be either a list or a string") logger.debug('psf_list = %s',psf_list) npsfs = len(psf_list) logger.debug('npsfs = %d',npsfs) config['output']['file_name'] = psf_list file_name_in = config['output']['file_name'] logger.info("Looking for PSF at %s", file_name_in) file_name_out = config['hyper']['file_name'] if 'dir' in config['hyper']: file_name_out = os.path.join(config['hyper']['dir'], file_name_out) coords = [] params = [] for fi, f in enumerate(file_name_in): logger.debug('Loading file {0} of {1}'.format(fi, len(file_name_in))) fits = fitsio.FITS(f) coord, param = Star.read_coords_params(fits, 'psf_stars') fits.close() coords.append(coord) params.append(param) params = np.concatenate(params, axis=0) coords = np.concatenate(coords, axis=0) logger.info('Computing average for {0} params with {1} stars'.format(len(params[0]), len(coords))) if params_fitted is None: params_fitted = range(len(params[0])) lu_min, lu_max = np.min(coords[:,0]), np.max(coords[:,0]) lv_min, lv_max = np.min(coords[:,1]), np.max(coords[:,1]) nbin_u = int((lu_max - lu_min) / bin_spacing) nbin_v = int((lv_max - lv_min) / bin_spacing) binning = [np.linspace(lu_min, lu_max, nbin_u), np.linspace(lv_min, lv_max, nbin_v)] nbinning = (len(binning[0]) - 1) * (len(binning[1]) - 1) params0 = np.zeros((nbinning, len(params[0]))) Filter = np.array([True]*nbinning) for i in range(len(params[0])): if i in params_fitted: average, u0, v0, bin_target = binned_statistic_2d(coords[:,0], coords[:,1], params[:,i], bins=binning, statistic=stat_used) average = average.T average = average.reshape(-1) Filter &= np.isfinite(average).reshape(-1) params0[:,i] = average # get center of each bin u0 = u0[:-1] + (u0[1] - u0[0])/2. v0 = v0[:-1] + (v0[1] - v0[0])/2. u0, v0 = np.meshgrid(u0, v0) coords0 = np.array([u0.reshape(-1), v0.reshape(-1)]).T # remove any entries with nan (counts == 0 and non finite value in # the 2D statistic computation) coords0 = coords0[Filter] params0 = params0[Filter] dtypes = [('COORDS0', coords0.dtype, coords0.shape), ('PARAMS0', params0.dtype, params0.shape), ] data = np.empty(1, dtype=dtypes) data['COORDS0'] = coords0 data['PARAMS0'] = params0 logger.info('Writing average solution to {0}'.format(file_name_out)) with fitsio.FITS(file_name_out,'rw',clobber=True) as f: f.write_table(data, extname='average_solution')