# This code follows the Gemini standard for other instruments
# even though pylint doesn't like it.
# pylint: disable=no-self-use, inconsistent-return-statements
"""
This module contains the AstroDataGhost class, used for adding tags
and descriptors to GHOST data.
"""
from astrodata import (
astro_data_tag,
TagSet,
astro_data_descriptor,
)
from gemini_instruments.gemini import AstroDataGemini
from gemini_instruments.common import build_group_id
[docs]
def return_dict_for_bundle(desc_fn):
"""
A decorator that will return a dict with keys "blue" and "red" and values
equal to the descriptor return if it was sent the split files. A check is
made that all the returns are equal, and None is returned otherwise.
This works by splitting the bundle and evaluating the descriptor on all of
the blue and then red arms in turn. It therefore works regardless of whether
the required information is in the extensions or the nascent PHU.
Its behaviour on multi-extension slices is unclear.
"""
def wrapper(self, *args, **kwargs):
if not self.is_single and 'BUNDLE' in self.tags:
ret_dict = dict()
cameras = self.hdr.get('CAMERA', '')
namps = self.hdr.get('NAMPS')
i = 0
while i < len(namps):
camera = cameras[i].lower()
# We've found the nascent PHU of an arm exposure
if namps[i] > 1:
ret_value = desc_fn(self[i+1:i+namps[i]+1], *args, **kwargs)
i += namps[i] + 1
else: # it's a slitviewer
ret_value = desc_fn(self[i], *args, **kwargs)
i += 1
# Check for single-valuedness of all returns from this camera
try:
if ret_dict[camera] != ret_value:
ret_dict[camera] = None
except KeyError:
ret_dict[camera] = ret_value
return ret_dict
return desc_fn(self, *args, **kwargs)
return wrapper
[docs]
def use_nascent_phu_for_bundle(desc_fn):
"""
A decorator for bundles (where the PHU is minimal) that will instead
provide the first nascent PHU (of a red/blue arm) instead
"""
def wrapper(self, *args, **kwargs):
if 'BUNDLE' in self.tags:
phu_index = min(i for i, camera in enumerate(self.hdr['CAMERA'])
if camera in ('BLUE', 'RED'))
nascent_phu = self[phu_index]
return desc_fn(nascent_phu, *args, **kwargs)
return desc_fn(self, *args, **kwargs)
return wrapper
[docs]
class AstroDataGhost(AstroDataGemini):
"""
Class for adding tags and descriptors to GHOST data.
"""
__keyword_dict = dict(array_section = 'CCDSEC',
array_name = 'AMPNAME',
overscan_section = 'BIASSEC',
res_mode = 'SMPNAME',
exposure_time = 'EXPTIME',
saturation_level = 'SATURATE',
)
def __iter__(self):
if self.is_single:
yield self
else:
for n in range(len(self)):
yield self[n]
def __getitem__(self, idx):
"""
Override default slicing method for bundles for two reasons:
1) Prevent creation of a new AD from non-contiguous extensions, as
this doesn't make sense.
2) Prevent creation of a new "Frankenstein" AD with data from more
than one camera, as only original bundles should have this.
"""
obj = super().__getitem__(idx)
if 'BUNDLE' in self.tags:
#print("BUNDLE", idx, obj._mapping)
if max(obj.indices) - min(obj.indices) != len(obj.indices) - 1:
raise ValueError("Bundles can only be sliced contiguously")
#print(obj.tags)
# Find the nascent PHU if it's not a slit viewer, keep PHU if it is
# this copes with CAMERA return a list or a string (single-slice)
if obj.hdr.get('CAMERA')[0].startswith('S'):
# add in the first SLITV header so keywords are there
# (which is explicitly done in debundling)
objhdr = obj.hdr if obj.is_single else obj[0].hdr
phu = self.phu + objhdr
# because people just do things without understanding the ramifications
phu['EXPTIME'] = objhdr['EXPTIME']
obj.phu = phu
return obj
for i in range(min(obj.indices), -1, -1):
ndd = self._all_nddatas[i]
if not ndd.shape:
phu = ndd.meta['header']
break
else:
phu = self._all_nddatas[min(obj.indices)].meta['header']
obj.phu = phu
# This has to happen way down here because only by resetting the PHU
# can we prevent the 'BUNDLE' tag appearing and hence allow more
# slicing to occur
if len(set(ext.shape for ext in obj)) > 1:
raise ValueError("Bundles must be sliced from the same camera")
#print("RETURNING", len(obj))
return obj
[docs]
@astro_data_descriptor
def data_label(self):
"""
Returns the data label of an observation.
Returns
-------
str
the observation's data label
"""
if super().data_label() is None:
raise RuntimeError("Your data has no DATALAB header keyword")
return super().data_label()
@staticmethod
def _matches_data(source):
"""
Check if data is from GHOST.
Parameters
----------
source : astrodata.AstroData
The source file to check.
"""
return source[0].header.get('INSTRUME', '').upper() == 'GHOST'
@astro_data_tag
def _tag_instrument(self):
"""
Define the minimal tag set for GHOST data.
"""
return TagSet(['GHOST'])
@astro_data_tag
def _tag_bundle(self):
"""
Define the 'bundled data' tag set for GHOST data.
"""
# Gets blocked by tags created by split files
return TagSet(['BUNDLE'])
@astro_data_tag
def _tag_bias(self):
"""
Define the 'bias data' tag set for GHOST data.
"""
if self.phu.get('OBSTYPE') == 'BIAS':
return TagSet(['CAL', 'BIAS'], blocks=['IMAGE', 'SPECT'])
@astro_data_tag
def _tag_dark(self):
"""
Define the 'dark data' tag set for GHOST data.
"""
if self.phu.get('OBSTYPE') == 'DARK':
return TagSet(['CAL', 'DARK'], blocks=['IMAGE', 'SPECT'])
@astro_data_tag
def _tag_arc(self):
"""
Define the 'arc data' tag set for GHOST data.
"""
if self.phu.get('OBSTYPE') == 'ARC':
return TagSet(['CAL', 'ARC'])
@astro_data_tag
def _tag_flat(self):
"""
Define the 'flat data' tag set for GHOST data.
"""
if self.phu.get('OBSTYPE') == 'FLAT':
return TagSet(['CAL', 'FLAT'])
@astro_data_tag
def _tag_slitflat(self):
"""
Define the 'slitflat data' tag set for GHOST data.
"""
if (self.phu.get('OBSTYPE') == 'FLAT' and
self.phu.get('CAMERA', '').lower().startswith('slit')):
return TagSet(['CAL', 'SLITFLAT'])
@astro_data_tag
def _tag_sky(self):
"""
Define the 'flat data' tag set for GHOST data.
"""
if self.phu.get('OBSTYPE') == 'SKY':
return TagSet(['SKY'])
# MCW 191107 - had to add SLIT back in to make cal system work
@astro_data_tag
def _tag_slitv(self):
"""
Define the 'slit data' tag set for GHOST data.
"""
if self.phu.get('CAMERA', '').lower().startswith('slit'):
return TagSet(['SLITV', 'SLIT'],
blocks=['SPECT', 'BUNDLE'])
@astro_data_tag
def _tag_image(self):
"""
Tag slitviewer images as IMAGE (must be done separately from
the "SLITV" so "SLITV" isn't blocked by BIAS or BPM)
"""
if self.phu.get('CAMERA', '').lower().startswith('slit'):
return TagSet(['IMAGE'])
@astro_data_tag
def _tag_camera(self):
"""
Define the 'spectrograph data' tag set for GHOST data.
"""
# Also returns BLUE or RED if the CAMERA keyword is set thus
camera = self.phu.get('CAMERA')
if camera in ('BLUE', 'RED'):
return TagSet([camera], blocks=['BUNDLE'])
@astro_data_tag
def _tag_spect(self):
"""
Tag echelle frames as SPECT (must be done separately from
"RED"/"BLUE" so that isn't blocked by BIAS or BPM)
"""
camera = self.phu.get('CAMERA')
if camera in ('BLUE', 'RED'):
return TagSet(['SPECT', 'XD'], blocks=['BUNDLE'])
@astro_data_tag
def _status_processed_ghost_cals(self):
"""
Define the 'processed data' tag set for GHOST data.
"""
kwords = set(['PRSLITIM', 'PRSLITBI', 'PRSLITDA', 'PRSLITFL',
'PRWAVLFT', 'PRPOLYFT', 'PROCSTND'])
if set(self.phu) & kwords:
return TagSet(['PROCESSED'])
@astro_data_tag
def _tag_processed_standard(self):
if 'PROCSTND' in self.phu:
return TagSet(['STANDARD'])
#@astro_data_tag
#def _tag_binning_mode(self):
# """
# TODO: this should not be a tag
# Define the tagset for GHOST data of different binning modes.
# """
# binnings = self.hdr.get('CCDSUM')
# if binnings is None: # CJS hack
# return TagSet([])
# if isinstance(binnings, list):
# binnings = [x for x in binnings if x]
# if all(x == binnings[0] for x in binnings):
# return TagSet([binnings[0].replace(' ', 'x', 1)])
# else:
# return TagSet(['NxN'])
# else:
# # A list should always be returned but it doesn't
# # hurt to be able to handle a string just in case
# return TagSet([binnings.replace(' ', 'x', 1)])
@astro_data_tag
def _tag_obsclass(self):
"""
Define the tagset for 'partnerCal' observations.
"""
if self.phu.get('OBSCLASS') == 'partnerCal':
return TagSet(['PARTNER_CAL'])
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def amp_read_area(self, pretty=False):
"""
Returns a list of amplifier read areas, one per extension, made by
combining the amplifier name and detector section; or, returns a
string if called on a single-extension slice.
Note: this descriptor is only used for calibration association
purposes in the archive, not during DR, so for pragmatic reasons
we will compress a list of identical elements to a single-element list
to aid the archive code when matching SLITV calibrations (which will
only have one extension)
Returns
-------
list/str
read_area of each extension
"""
# Note that tiled arrays won't have an array_name, so we'll fake it
# FIXME correctly fetch keyword for tileArrays primitive
if self.phu.get('TILEARRY', None) is not None:
ampname = [0, ]
else:
ampname = self.array_name()
detsec = self.detector_section(pretty=True)
# Combine the amp name(s) and detector section(s)
if self.is_single:
return "'{}':{}".format(ampname,
detsec) if ampname and detsec else None
else:
ret_value = ["'{}':{}".format(a,d)
if a is not None and d is not None else None
for a,d in zip(ampname, detsec)]
if ret_value == ret_value[::-1]:
return ret_value[:1]
return ret_value
[docs]
@astro_data_descriptor
def arm(self):
"""
Returns a string indicating whether these data are from the red
or blue arm of the spectrograph.
Returns
-------
str/None
Color of the arm (`'blue'`, `'red'`), or `'slitv'` in case of slit
viewer data. Returns `None` if arm/slit status can't be determined.
"""
tags = self.tags
if 'BLUE' in tags:
return 'blue'
elif 'RED' in tags:
return 'red'
elif 'SLITV' in tags:
return 'slitv'
return None
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def array_name(self):
"""
Return the arr
Returns
-------
str: a concatenated string of the detector name and amplifier
"""
if self.is_single:
return f"{self.detector_name()}, {self.hdr.get('AMPNAME')}"
else:
return [f"{ext.detector_name()}, {ext.hdr.get('AMPNAME')}" for ext in self]
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def binning(self):
"""
Returns an "MxN"-style string because CJS is fed up with not having this!
"""
return super().binning()
[docs]
@astro_data_descriptor
def calibration_key(self):
"""
Returns a suitable calibration key for GHOST, which includes the arm.
"""
return (self.data_label().replace('_stack', ''), self.arm())
# FIXME Remove once headers corrected
[docs]
@astro_data_descriptor
def central_wavelength(self, asMicrometers=False, asNanometers=False,
asAngstroms=False): # pragma: no cover
"""
Dummy to work around current Gemini cal_mgr
"""
val = self.phu.get(self._keyword_for('central_wavelength'), None)
if val is None:
if self.arm() == 'red':
val = 4000. * 10**-10
elif self.arm() == 'blue':
val = 6000. * 10**-10
else:
return None
if asMicrometers:
val *= 10**6
elif asNanometers:
val *= 10**9
elif asAngstroms:
val *= 10**10
return float(val)
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def detector_name(self, pretty=False):
"""
Returns the detector (CCD) name.
"""
return self.phu.get('DETECTOR')
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def detector_x_bin(self):
"""
Returns the detector binning in the x-direction.
Returns
-------
int
The detector binning
"""
def _get_xbin(binning):
try:
return int(binning.split()[0])
except (AttributeError, ValueError):
return None
binning = self.hdr.get('CCDSUM')
if self.is_single:
return _get_xbin(binning)
else:
xbin_list = [_get_xbin(b) for b in binning]
# Check list is single-valued
return xbin_list[0] if xbin_list == xbin_list[::-1] else None
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def detector_y_bin(self):
"""
Returns the detector binning in the y-direction.
Returns
-------
int
The detector binning
"""
def _get_ybin(binning):
try:
return int(binning.split()[1])
except (AttributeError, ValueError, IndexError):
return None
binning = self.hdr.get('CCDSUM')
if self.is_single:
return _get_ybin(binning)
else:
ybin_list = [_get_ybin(b) for b in binning]
# Check list is single-valued
return ybin_list[0] if ybin_list == ybin_list[::-1] else None
# TODO: GHOST descriptor returns no values if data are unprepared
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def exposure_time(self):
"""
Returns the exposure time. If run on a bundle, it returns the exposure
time of a single exposure in each arm, NOT the total exposure time
Returns
-------
int
exposure time of a single exposure
"""
return super().exposure_time()
# Don't let this special logic happen for bundles
#if 'BUNDLE' not in self.tags:
# if exp_time_default is None:
# exposure_time = self[0].hdr.get(
# self._keyword_for('exposure_time'),
# -1)
# if exposure_time == -1:
# return None
# return exposure_time
#
#return exp_time_default
[docs]
@astro_data_descriptor
def focal_plane_mask(self, *args, **kwargs):
"""
Returns the "focal plane mask", primarily to populate the archive's
Header table so it can be searched on.
Returns
-------
str
"HR"/"SR" as appropriate
"""
try:
return self.res_mode().upper()[0]+"R"
except AttributeError:
return None
# The gain() descriptor is inherited from gemini/adclass, and returns
# the value of the GAIN keyword (as a list if sent a complete AD object,
# or as a single value if sent a slice). This is what the GHOST version
# does so one is not needed here.
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def gain_setting(self):
"""
Returns the gain setting for this observation (e.g., 'high', 'low')
Returns
-------
str
the gain setting
"""
# TODO: confirm returns. Future-proofing for a possible high-gain mode
if 'SLITV' in self.tags:
return "standard"
gain = self.gain()
if self.is_single:
return "low" if gain < 1.0 else "high"
low_gain = [g <= 1.0 for g in gain]
if all(low_gain):
return "low"
elif any(low_gain):
raise ValueError("Some gains are low and some are high")
return "high"
[docs]
@astro_data_descriptor
def group_id(self):
"""
Returns a string representing a group of data that are compatible
with each other. This is used when stacking, for example. Each
instrument, mode of observation, and data type will have its own rules.
Returns
-------
str
A group ID for compatible data
"""
tags = self.tags
if 'DARK' in tags:
desc_list = ['exposure_time', 'coadds']
elif 'BIAS' in tags:
desc_list = []
else: # science exposures (and ARCs)
desc_list = ['observation_id', 'res_mode']
desc_list.append('arm')
# never stack frames of mixed binning modes
desc_list.append('detector_x_bin')
desc_list.append('detector_y_bin')
# MCW: We care about the resolution mode EXCEPT for dark and bias
if 'DARK' not in tags and 'BIAS' not in tags:
desc_list.append('res_mode')
# CJS: Generally need to stop FLATs being stacked with science
additional_item = 'FLAT' if 'FLAT' in tags else None
return build_group_id(self, desc_list, prettify=[],
additional=additional_item)
[docs]
@astro_data_descriptor
def non_linear_level(self):
"""
Returns the level at which the data become non-linear. This is
the same as the saturation level for the GHOST CCDs.
Returns
-------
int/list
non-linearity level
"""
return self.saturation_level()
[docs]
@astro_data_descriptor
def number_of_exposures(self):
"""
Return the number of individual exposures
Returns
-------
int/dict
number of exposures
"""
if 'BUNDLE' not in self.tags:
return len(self) if 'SLITV' in self.tags else 1 # probably
if self.is_single:
return 1
cameras = self.hdr.get('CAMERA')
namps = self.hdr.get('NAMPS')
ret_value = {'blue': 0, 'red': 0, 'slitv': 0}
for camera, namp in zip(cameras, namps):
if namp is not None:
ret_value[camera.lower()] += 1
return ret_value
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def read_mode(self):
"""
Returns a string describing the read mode, matching that offered
in the OT. Only the readout speed will be configurable, so that
is what is returned, albeit in a circuitous way to future-proof.
Returns
-------
str
The read mode
"""
# TODO: get appropriate return values
_read_mode_dict = {("slow", "low"): "slow",
("medium", "low"): "medium",
("fast", "low"): "fast",
("fast", "high"): "bright",
("standard", "standard"): "standard"} # SLITV
return _read_mode_dict.get((self.read_speed_setting(),
self.gain_setting()), "unknown")
# TODO: read_noise(): see comments on gain()
[docs]
@astro_data_descriptor
@return_dict_for_bundle
def read_speed_setting(self):
"""
Returns the setting for the readout speed (slow or fast)
Returns
-------
str
The read speed ("slow"/"medium"/"fast")
"""
if 'SLITV' in self.tags:
return "standard"
return ("slow", "medium", "fast", "unknown")[self.phu.get('READMODE', 3)]
[docs]
@astro_data_descriptor
def res_mode(self):
"""
Get the GHOST resolution mode of this dataset
Returns
-------
str/None
Resolution of the dataset ('high' | 'std'). Returns `None` if
resolution mode cannot be determined.
"""
mode = self.phu.get('SMPNAME')
try:
if mode.endswith('HI_ONLY'):
return 'high'
elif (mode.endswith('LO_ONLY') or mode.endswith('STD_ONLY')):
return 'std'
except Exception:
pass
return None
[docs]
@astro_data_descriptor
def saturation_level(self):
"""Patch because SATURATE=0 for the blue spectrograph"""
retval = super().saturation_level()
if 'PREPARED' in self.tags:
return retval
if self.is_single:
return None if retval is None else retval if retval > 0 else 65535
return [None if v is None else v if v > 0 else 65535 for v in retval]
[docs]
@astro_data_descriptor
@use_nascent_phu_for_bundle
def ut_datetime(self, *args, **kwargs):
return AstroDataGemini.ut_datetime(self, *args, **kwargs)
[docs]
@astro_data_descriptor
def want_before_arc(self):
"""
This is a special descriptor which is being used as a calibration
system work-around. Outside of active reduction, this descriptor
should always return None, as the relevant header keyword should only
exist very briefly during the fetching of bracketed arc files.
Returns
-------
bool or `None`
"""
want_before = self.phu.get('ARCBEFOR', None)
if want_before:
return True
elif want_before is None:
return None
else:
return False