Source code for showapi.level0.showlevel0

import numpy as np
import math
import xarray
from show_config.show_configuration import Parameters
from .l0algorithms import  L0Algorithms
from typing import Pattern, List, Dict, Tuple, Any, NamedTuple, Union

# ------------------------------------------------------------------------------
#           L0ImageStats
#------------------------------------------------------------------------------

L0ImageStats = NamedTuple('L0ImageStats', [('average', np.ndarray), ('error',np.ndarray), ('stddev', np.ndarray), ('median', np.ndarray)])
""" A simple namedtuple to hold image statistics.

.. py:attribute:: average

    The average of all the images on a pixel by pixel basis. The average is an instance of :class:`showapi.level0.level0imagearray.Level0_Image` so you have to access to averaged header information as well as theimage. Access statsobj.average.image if
    you want to get at the numpy 2-d image


.. py:attribute:: stddev

    The standard deviation of all images expressed as a numpy 2-D array

.. py:attribute:: error

    The error of all images expressed as a numpy 2-D array. This is typically the standard deviation devided by the square oot of the number of images.

.. py:attribute:: median

    The median value of all images that contributed to the statistics, This can be useful if you think the statistics are being skewed by a bad image.
"""

#------------------------------------------------------------------------------
#           class SHOWLevel0:
#
#------------------------------------------------------------------------------

[docs]class SHOWLevel0: #------------------------------------------------------------------------------ # SHOWLevel0:: __init__ #------------------------------------------------------------------------------
[docs] def __init__(self, instrumentname : str, netcdf_filename : str, groupname : str) : """ Constructs the SHOW level 0 object for the given instrument and optionally load in images from one or more directories :param instrumentname: The name of the instrument configuration. This is used to lookup a yaml configuration file stored in the config folder. The configuration file will indicate the file format to be be used to read the images. Valid values include 1. **timmins_2014** for data collected by the original SHOW instrument used for the Timmins 2014 balloon flight 2. **er2_2017** for the SHOW instrument using the XIPHOS telemetry system developed for the ER2 flight in 2017. See notes on :ref:`xiphos_directory_format` below 3. **uofs_dec2016** for images collected with the EPIX framegrabber at Univ of Sasktachewan in December 2016 :param dirnames: default is None. If not None then *dirnames* will be either one list_of_files or a list of directories. The new instance will load images from all of the directories. Each list_of_files is specified as a string. If *dirnames* is None then no images are loaded and the user may call :meth:`read_level0` at a later time to read in images. :param collection: default is None. If not None then SHOW Level 0 object will be assigned this collection of images """ self.params = Parameters( instrumentname ) # type: Parameters self.xarrayvar = None self.time = None self.exposure_time = None self.temperatures = None self.high_gain = None self.netcdf_filename = netcdf_filename self.groupname = groupname
# if ( netcdf_filename is not None): # self.open( netcdf_filename, groupname) # ------------------------------------------------------------------------------ # SHOWLevel0::__len__ # ------------------------------------------------------------------------------ def __len__(self): return self.numrecords() def numrecords(self): return len(self.time) if self.time is not None else 0 #------------------------------------------------------------------------------ # def open(self, filename : str, groupname : str): #------------------------------------------------------------------------------ def open(self): self.xarrayvar = xarray.open_dataset( self.netcdf_filename, decode_cf=True, group=self.groupname) self.heightrow = self.xarrayvar.heightrow self.pixelrow = self.xarrayvar.pixelrow self.temperature_labels = self.xarrayvar.temperature_labels self.time = self.xarrayvar.time self.exposure_time = self.xarrayvar.exposure_time self.temperatures = self.xarrayvar.temperatures self.high_gain = self.xarrayvar.high_gain self.image = self.xarrayvar.image #------------------------------------------------------------------------------ # close #------------------------------------------------------------------------------ def close(self): if self.xarrayvar is not None: self.xarrayvar.close() self.xarrayvar = None #------------------------------------------------------------------------------ # __enter__ #------------------------------------------------------------------------------ def __enter__(self): self.open() return self #------------------------------------------------------------------------------ # __exit__ #------------------------------------------------------------------------------ def __exit__(self, type, value, traceback): self.close() #------------------------------------------------------------------------------ # SHOWLevel0::algorithms #------------------------------------------------------------------------------
[docs] def algorithms(self ) -> L0Algorithms : """ Fetch the Level0 algorithm object associated with this Level 0 object :return: A :class:`~showapi.level0.l0algorithms.L0Algorithms` object. """ return L0Algorithms(self.params)
#------------------------------------------------------------------------------ # SHOWLevel0::integration_times #------------------------------------------------------------------------------
[docs] def unique_integration_times(self)->List[float]: """ Returns the list of unique integration times in microseconds in this Level 0 object :return: the list of integration times """ uniquetimes = np.unique( self.exposure_time ) return list( uniquetimes )
#------------------------------------------------------------------------------ # def instrument_name(self): #------------------------------------------------------------------------------
[docs] def instrument_name(self) -> str: """ Return the instrument name of this Level 0 object """ return self.params.instrumentname
#------------------------------------------------------------------------------ # def image_at(self, index: int) ->np.ndarray: #------------------------------------------------------------------------------
[docs] def image_at(self, index: Union[ int, List[int]], dtype='f8' ) ->np.ndarray: """ Returns the image at locations given by index as a floating point array :param index: :return: """ return self.image[index, :, :].values.astype(dtype)
#------------------------------------------------------------------------------ # def average_images(self, ): #------------------------------------------------------------------------------
[docs] def average_images( self, index : List[int] = None, darkcurrent : Dict[float,L0ImageStats] = None, flatfield : np.ndarray = None) -> L0ImageStats: """ Averages all of the images in the collection and returns the average, standard deviation and error. Note that you will probably want to sort data by exposure time first as we do not to divide by exposure time :param darkcurrent: a dictionary of dark current statistics for differnt exposure times. The dark current will be found that matches each images exposure time and subtracted form the image befor ethe image is averaged etc. Default is None which means no dark current correction is applied. :return: three element tuple[ average image and header, error image, standard deviation of image]. :rtype: Tuple[ Level0_Image, np.ndarray, np.ndarray ] """ avg = None sd = None err = None n = 0.0 if (index is None): index = np.arange(0,self.numrecords()) median = np.zeros((self.numrecords(),)) for i in index: imge = self.correct_image_ff_and_dc( i, darkcurrent=darkcurrent, ff=flatfield) if avg is None: avg = imge sd = imge * imge else: avg = avg + imge sd = sd + imge * imge n += 1.0 median[i] = np.median(imge) if n > 0.0: avg = avg / n # get the average signa q = (sd - n * ( avg * avg)) / n # Get the mean square deviation, (I avoid the n-1 form so it still works if we have only 1 image) q[np.where( q < 0.0)] = 0.0 # Check for a few bad points that might stray ever so slightly less than 0.0, it throws the sqrt code sd = np.sqrt(q) # Get the root mean square deviation ne = math.sqrt(n - 1) if (n > 1) else 1 err = sd / ne # Get the error estimate on the average value. results = L0ImageStats(average=avg, error=err, stddev=sd, median=median) return results
#------------------------------------------------------------------------------ # SHOWLevel0::make_dark_current #------------------------------------------------------------------------------
[docs] def make_dark_current( self, netcdf_filename: str, groupname: str ) -> Dict[ float, L0ImageStats]: """ Creates an internal dark current, :class:`~showapi.level0.showlevel0.L0ImageStats`, object from the images in the list of directories. A dark current entry is created for ewach unique exposure time. The user is responsible for ensuring that all the images in each list_of_files have good dark current images. No attempt is made to detect and eliminate outlier problems. Note that it can take several minutes to process the directories if they contain hundreds or thousands of images. :param dark_dirnames: Either a string or a list of strings. Each string is the name of a dark current image list_of_files used to make the dark current statistics. :return: None """ l0dark = SHOWLevel0( self.instrument_name(), netcdf_filename, groupname) unique_times = list( np.unique( l0dark.exposure_time ) ) stats = {} for t in unique_times: index, = np.where( l0dark.exposure_time == t) s = l0dark.average_images( index = index) stats[t] = s return stats
#------------------------------------------------------------------------------ # make_flat_field #------------------------------------------------------------------------------
[docs] def make_flat_field( self, arma_netcdf_filename : str, arma_groupname : str, armb_netcdf_filename : str, armb_groupname : str, dark_stats : Dict[ float, L0ImageStats]) -> np.ndarray: """ Creates an internal flat-field, :class:`~showapi.level0.showlevel0.L0ImageStats`, object from the average of Arm A and Arm B white light images. A flat-field entry is created for each unique exposure time. Dark current is removed both Arm A and Arm B using the results from a previous call to make_dark_current. The user is responsible for ensuring that all the images in each list_of_files have good flat-field behaviour and that there are sensible amounts of Arm A and Arm B measurements (ie approx equal amounts). No attempt is made to detect and eliminate outlier problems. It can take several minutes to process the directories if they contain hundreds or thousands of images. :param arma_netcdf_filename: The name of the netcdf file containing raw, level 0, arm A images. :param arma_groupname: The name of the group inside the netcdf file containing the raw, level 0, arm A images. :param armb_netcdf_filename: The name of the netcdf file containing raw, level 0, arm B images. :param armb_groupname: The name of the group inside the netcdf file containing the raw, level 0, arm B images. :param dark_stats: The dark current image statistics. The dark current stats should have exposure times (and temperatures which are not checked) that match the exposure times of the flat-field images. :return: np.ndarray """ if (arma_groupname is None) and (armb_groupname is None): return 1.0 armadata = SHOWLevel0( self.instrument_name(), arma_netcdf_filename, arma_groupname) armbdata = SHOWLevel0( self.instrument_name(), armb_netcdf_filename, armb_groupname) statsa = armadata.average_images(darkcurrent=dark_stats) statsb = armbdata.average_images(darkcurrent=dark_stats) image = statsa.average + statsb.average # sum the two images maxsig = np.percentile( image, 98) # get a value near the max, but watch out for one or two bad points image = image/maxsig # renormalize the image. Its a little sloppy for absolute calibration but its ok for now bad = np.where(image < 0.2) # mask out points that will need a correction of 5 or more image[bad] = 1.0e6 # set the bad values so they dont generate a math warning error ffimage = 1.0/image # Get the reciprocal of the flat field ffimage[bad] = 0.0 # and mask out the bad pixels so they dont contribute return ffimage
#------------------------------------------------------------------------------ # SHOWLevel0::correct_image_ff_and_dc #------------------------------------------------------------------------------ def correct_image_ff_and_dc( self, index : int, darkcurrent : Dict[float,L0ImageStats] = None, ff : np.ndarray = None) -> np.ndarray : image = self.image_at(index) exposure_time = float(self.exposure_time[index]) fpa_temp = float(self.temperatures[index,0]) if (darkcurrent is not None): dcstats = darkcurrent.get(exposure_time) if (dcstats is not None): image -= dcstats.average else: print( 'WARNING, Level0_ImageCollection::correct_image_ff_and_dc could not find a dark current for exposure time %8.3f millisecs. Dark current is not subtracted' % (exposure_time / 1000.0,)) if (ff is not None): image *= ff return image