Source code for astrodata.factory

import logging
import os
from contextlib import contextmanager
from copy import deepcopy

from astropy.io import fits

LOGGER = logging.getLogger(__name__)


[docs]class AstroDataError(Exception): pass
[docs]class AstroDataFactory: _file_openers = ( fits.open, ) def __init__(self): self._registry = set() @staticmethod @contextmanager def _openFile(source): """ Internal static method that takes a ``source``, assuming that it is a string pointing to a file to be opened. If this is the case, it will try to open the file and return an instance of the appropriate native class to be able to manipulate it (eg. ``HDUList``). If ``source`` is not a string, it will be returned verbatim, assuming that it represents an already opened file. """ if isinstance(source, (str, os.PathLike)): stats = os.stat(source) if stats.st_size == 0: LOGGER.warning(f"File {source} is zero size") # try vs all handlers for func in AstroDataFactory._file_openers: try: fp = func(source) yield fp except Exception: # Just ignore the error. Assume that it is a not supported # format and go for the next opener pass else: if hasattr(fp, 'close'): fp.close() return raise AstroDataError("No access, or not supported format for: {}" .format(source)) else: yield source
[docs] def addClass(self, cls): """ Add a new class to the AstroDataFactory registry. It will be used when instantiating an AstroData class for a FITS file. """ if not hasattr(cls, '_matches_data'): raise AttributeError("Class '{}' has no '_matches_data' method" .format(cls.__name__)) self._registry.add(cls)
[docs] def getAstroData(self, source): """ Takes either a string (with the path to a file) or an HDUList as input, and tries to return an AstroData instance. It will raise exceptions if the file is not found, or if there is no match for the HDUList, among the registered AstroData classes. Returns an instantiated object, or raises AstroDataError if it was not possible to find a match Parameters ---------- source : `str` or `pathlib.Path` or `fits.HDUList` The file path or HDUList to read. """ candidates = [] with self._openFile(source) as opened: for adclass in self._registry: try: if adclass._matches_data(opened): candidates.append(adclass) except Exception: # Some problem opening this pass # For every candidate in the list, remove the ones that are base # classes for other candidates. That way we keep only the more # specific ones. final_candidates = [] for cnd in candidates: if any(cnd in x.mro() for x in candidates if x != cnd): continue final_candidates.append(cnd) if len(final_candidates) > 1: raise AstroDataError("More than one class is candidate for this dataset") elif not final_candidates: raise AstroDataError("No class matches this dataset") return final_candidates[0].read(source)
[docs] def createFromScratch(self, phu, extensions=None): """Creates an AstroData object from a collection of objects. Parameters ---------- phu : `fits.PrimaryHDU` or `fits.Header` or `dict` or `list` FITS primary HDU or header, or something that can be used to create a fits.Header (a dict, a list of "cards"). extensions : list of HDUs List of HDU objects. """ lst = fits.HDUList() if phu is not None: if isinstance(phu, fits.PrimaryHDU): lst.append(deepcopy(phu)) elif isinstance(phu, fits.Header): lst.append(fits.PrimaryHDU(header=deepcopy(phu))) elif isinstance(phu, (dict, list, tuple)): p = fits.PrimaryHDU() p.header.update(phu) lst.append(p) else: raise ValueError("phu must be a PrimaryHDU or a valid header object") # TODO: Verify the contents of extensions... if extensions is not None: for ext in extensions: lst.append(ext) return self.getAstroData(lst)