Source code for showapi.level1.l0_to_l1a_pipeline

from typing import Pattern, List, Dict, Tuple, Any, NamedTuple, Union
import numpy as np
import datetime
import re
import math
import logging
import os
import os.path
from show_config.show_configuration import Show_Configuration, read_configuration, cdb_directory
from showcdb.showcdb import SHOWCDB_DarkCurrent, SHOWCDB_DCBias, SHOWCDB_NUC, SHOWCDB_FlatField
import argcommon.netcdfutils
from ..level0.showlevel0 import SHOWLevel0
from .l1a_io import SHOW_Level1A_Output


g_logger = logging.getLogger("Level 0")

#------------------------------------------------------------------------------
#           class L0_to_L1A_Pipeline
#------------------------------------------------------------------------------

[docs]class L0_to_L1A_Pipeline(): EXCEPTION_FLAG_BAD_PIXEL = 0x0001 EXCEPTION_FLAG_SATURATED_PIXEL = 0x0002 #------------------------------------------------------------------------------ # L0_to_L1A_Pipeline::__init__ #------------------------------------------------------------------------------ def __init__(self, instrumentname : str, version: Tuple[int,int,int]): self.lastutc = datetime.datetime(1980,1,1) self.instrumentname = instrumentname # type: str self.versionmajor = version[0] self.versionminor = version[1] self.versiondetail = version[2] self.config = read_configuration( instrumentname) # type: Show_Configuration self.l0image = None # type: np.ndarray self.temperatures = None # type: np.ndarray self.fpa_temp = None # type: float self.utc = None # type: datetime.datetime self.temp_labels = None # type: List [ str ] self.l1image = None # type: np.ndarray self.l1error = None # type: np.ndarray self.standard_bad_pixels = None # type: List[ Tuple[int,int] ] self.standard_big_badpixels= None # type: List[ Tuple[ int, int, int, int]] self.jitter_pixels = None # type: List[ Tuple[int,int] ] self.dcbias = None # type: SHOWCDB_DCBias self.darkcurent = None # type: SHOWCDB_DarkCurrent self.nuc = None # type: SHOWCDB_NUC self.flatfield = None # type: SHOWCDB_FlatField self.sub_window = [ self.config["iFOV"]["Pixel_column_at_left_edge_of_grating_image"], self.config["iFOV"]["Pixel_column_at_right_edge_of_grating_image"], self.config["iFOV"]["Pixel_row_at_bottom_of_atmospheric_image"], self.config["iFOV"]["Pixel_row_at_top_edge_of_atmospheric_image"]] # type: Tuple[int, int, int, int] #------------------------------------------------------------------------------ # reset_datecheck #------------------------------------------------------------------------------
[docs] def reset_datecheck(self): """ Reset the internal flag used to detect data not collected at 1 Hz frame rate :return: """ self.lastutc = None
#------------------------------------------------------------------------------ # open_pipeline #------------------------------------------------------------------------------
[docs] def open_pipeline(self): # self.dcbias = SHOWCDB_DCBias ( self.instrumentname, self.config ) self.darkcurent = SHOWCDB_DarkCurrent( self.instrumentname, self.config ) self.nuc = SHOWCDB_NUC ( self.instrumentname, self.config ) self.flatfield = SHOWCDB_FlatField ( self.instrumentname, self.config ) self.load_standard_bad_pixels( self.config) self.load_jitter_pixels(self.config) return self
#------------------------------------------------------------------------------ # close_pipeline #------------------------------------------------------------------------------
[docs] def close_pipeline(self): # if self.dcbias is not None: self.dcbias.close() if self.darkcurent is not None: self.darkcurent.close() if self.nuc is not None: self.nuc.close() if self.flatfield is not None: self.flatfield.close() self.dcbias = None self.darkcurent = None self.nuc = None self.flatfield = None
#------------------------------------------------------------------------------ # __enter__ for use with the 'with' statement #------------------------------------------------------------------------------ def __enter__(self): self.open_pipeline() return self #------------------------------------------------------------------------------ # __exit__ for use with the 'with' statement #------------------------------------------------------------------------------ def __exit__(self, type, value, traceback): self.close_pipeline() #------------------------------------------------------------------------------ # L0_to_L1A_Pipeline::reset_pipeline #------------------------------------------------------------------------------
[docs] def reset_pipeline(self, l0:SHOWLevel0, i : int) -> bool: """ Reset the processing pipeline so it can process another image :param l0: :param i: :return: """ self.l0image = l0.image_at(i,dtype=np.uint16) self.temperatures = np.array(l0.temperatures[i, :]) self.fpa_temp = self.temperatures[0] self.utc = datetime.datetime.utcfromtimestamp( l0.time[i:i+1].values[0].astype('O')/1e9) self.exposure_time = l0.exposure_time[i:i+1].values[0] self.temp_labels = np.array(l0.temperature_labels) self.l1image = None self.l1error = None return True
#------------------------------------------------------------------------------ # load_standard_bad_pixels #------------------------------------------------------------------------------
[docs] def load_list_of_pixels(self, filename): """ Load a list of pixel indices from a simple text file :return: """ try: f = open(filename, 'rt') lines = f.readlines() f.close() except: g_logger.warning("L0_to_L1A_Pipeline:load_list_of_pixels, error reading list of pixels from file %s", filename) lines = [] pixels_list = [] ix = [] iy = [] for l in lines: m = re.match( r'(\d+)[ ,]+(\d+)', l ) if (m is not None): iy.append( int(m.group(1)) ) ix.append( int(m.group(2)) ) if (len(ix) > 0 ): pixels_list.append( iy ) pixels_list.append( ix ) return pixels_list
#------------------------------------------------------------------------------ # load_standard_bad_pixels #------------------------------------------------------------------------------
[docs] def load_standard_bad_pixels(self, config): """ Load the standard bad pixels from the calibration dtaabase. This is just a simple text file :return: """ self.standard_bad_pixels = config['badpixels']['small'] self.standard_big_badpixels = config['badpixels']['big']
#------------------------------------------------------------------------------ # load_jitter_pixels #------------------------------------------------------------------------------
[docs] def load_jitter_pixels(self, config): """ Load the standard bad pixels from the calibration dtaabase. This is just a simple text file :return: """ filename = cdb_directory(config) self.jitter_pixels = [] #self.load_list_of_pixels( filename)
#------------------------------------------------------------------------------ # process_pipeline #------------------------------------------------------------------------------
[docs] def process_pipeline(self) -> bool: """ Apply the Level 0 to Level 1A pipeline to the current level 0 image :return: """ ok = True ok = ok and self.mask_bits_14_and_15() ok = ok and self.check_normal_mode() ok = ok and self.initialize_readout_error() # ok = ok and self.image_jitter_validation() ok = ok and self.remove_dark_current() ok = ok and self.apply_shot_noise() ok = ok and self.mask_saturated_and_bad_pixels() ok = ok and self.sub_window_selection() ok = ok and self.nuc_correction() ok = ok and self.flat_field_correction() ok = ok and self.interpolate_bad_pixels() ok = ok and self.sqrt_error() ok = ok and self.normalize_integrationtime() # if (not ok): # g_logger.warning("L0_to_L1A_Pipeline:process_pipeline, rejecting record collected at %s", self.utc.strftime('%Y-%m-%d %H:%M:%S.%f') ) return ok
#------------------------------------------------------------------------------ # mask_bits_14_and_15 #------------------------------------------------------------------------------
[docs] def mask_bits_14_and_15(self): self.l1image = self.l0image self.l1image &= 0x3FFF self.l1image = self.l1image.astype( np.float ) return True
#------------------------------------------------------------------------------ # mask_bits_14_and_15 #------------------------------------------------------------------------------
[docs] def check_normal_mode(self): if (self.lastutc is not None): tmin = datetime.timedelta(milliseconds = 800) tmax = datetime.timedelta(seconds=1, milliseconds=200) dt = self.utc - self.lastutc ok1 = (dt > tmin) and (dt < tmax) if not ok1: g_logger.warning('Image at %s, the time gap (%s) between successive images was outside the range 0.8 to 1.2 seconds, data are accepted but note that current calibration code only supports 1 Hz acquisition rate', str(self.utc), str(dt)) # print( 'Image at %s, the time gap (%s) between successive images was outside the range 0.8 to 1.2 seconds, code currently only support 1 Hz'%( str(self.utc), str(dt))) self.lastutc = self.utc minval = np.min(self.l0image) ok = (minval > 1500) if not ok: g_logger.warning(" Image at %s, L0_to_L1A_Pipeline:check_normal_mode failed as the minval, %d, is less than 1500", str(self.utc), int(minval)) return ok
#------------------------------------------------------------------------------ # initialize_readout_error #------------------------------------------------------------------------------
[docs] def initialize_readout_error(self): electron_noise = self.config["cdb"]["detector_readout_noise"] Ne = self.config["cdb"]["electrons_per_DN"] err = electron_noise/Ne self.l1error = np.zeros( self.l1image.shape ) + (err*err) # get the square of the error. return True
#------------------------------------------------------------------------------ # image_jitter_validation #------------------------------------------------------------------------------
[docs] def image_jitter_validation(self): #print("jitter test not yet implemented") return True
#------------------------------------------------------------------------------ # mask_saturated_and_bad_pixels #------------------------------------------------------------------------------
[docs] def mask_saturated_and_bad_pixels(self): """ sets known bad pixels to Nan. Bad pixels are read in from the calibration database. Note they are 1 based indices. :return: """ maxy = self.l1image.shape[0] - 1 for index in self.standard_bad_pixels: ix = index[0] -1 # type: int iy = index[1] -1 # type: int self.l1image[ iy,ix ] = math.nan for index in self.standard_big_badpixels: ix0 = index[0] - 1 # type: int ix1 = index[1] iy0 = index[2] - 1 # type: int iy1 = index[3] self.l1image[iy0:iy1, ix0:ix1] = math.nan saturatedpix = np.where( self.l1image >= self.config['cdb']['dn_saturated_value']) self.l1image[saturatedpix] = math.nan return True
#------------------------------------------------------------------------------ # apply_shot_noise #------------------------------------------------------------------------------
[docs] def apply_shot_noise(self): Ne = self.config["cdb"]["electrons_per_DN"] self.l1error += self.l1image/Ne return True
#------------------------------------------------------------------------------ # remove_dark_current #------------------------------------------------------------------------------
[docs] def remove_dark_current(self): dark, error, ok = self.darkcurent.fetch_dark_current( self.exposure_time, self.fpa_temp, self.utc) if (ok): self.l1image -= dark self.l1error += (error*error) return ok
#------------------------------------------------------------------------------ # sub_windowed_array #------------------------------------------------------------------------------
[docs] def sub_windowed_array(self, userarray:np.ndarray): ix0 = self.sub_window[0]-1 ix1 = self.sub_window[1] #-1+1 iy0 = self.sub_window[2]-1 iy1 = self.sub_window[3] #-1+1 if userarray.ndim == 1: sarray = userarray[ iy0:iy1] else: sarray = userarray[ iy0:iy1, ix0:ix1 ] return sarray
#------------------------------------------------------------------------------ # sub_window_selection #------------------------------------------------------------------------------
[docs] def sub_window_selection(self): self.l1image = self.sub_windowed_array( self.l1image ) self.l1error = self.sub_windowed_array( self.l1error ) return True
#------------------------------------------------------------------------------ # nuc_correction #------------------------------------------------------------------------------
[docs] def nuc_correction(self): nuc, error,ok = self.nuc.fetch_nuc( self.fpa_temp, self.utc) if (ok): nuc = self.sub_windowed_array(nuc) error = self.sub_windowed_array( error) y = 1.0/nuc # Get the inverse of the NUC dy = error/(nuc*nuc) # Get the error in the inverse of the NUC self.l1image *= y # apply the NUC correction self.l1error *= (y*y) # and apply the (square of the) error propagation self.l1error += self.l1image*(dy*dy) # do the full error propagation return ok
#------------------------------------------------------------------------------ # flat_field_correction #------------------------------------------------------------------------------
[docs] def flat_field_correction(self): ff, error,ok = self.flatfield.fetch_flat_field( self.fpa_temp, self.utc) if (ok): ff = self.sub_windowed_array(ff) error = self.sub_windowed_array(error) y = 1.0/ff # Get the inverse of the flat-field dy = error/(ff*ff) # Get the error in the inverse of the flat field self.l1image *= y # apply the flat-field correction self.l1error *= (y*y) # and apply the (square of the) error propagation self.l1error += self.l1image*(dy*dy) # do the full error propagation return ok
#------------------------------------------------------------------------------ # sqrt_error #------------------------------------------------------------------------------
[docs] def sqrt_error(self): self.l1error = np.sqrt( self.l1error) bad = np.where( np.isnan(self.l1error) ) if len(bad[0]) > 0: print('There are bad error values in the sub-windowed results', bad) return True
#------------------------------------------------------------------------------ # interpolate_small_pixels #------------------------------------------------------------------------------
[docs] def interpolate_small_pixels(self): ix0 = self.sub_window[0]-1 iy0 = self.sub_window[2]-1 maxy = self.l1image.shape[0] - 1 bad = np.where( np.isnan(self.l1image) ) nump = len( bad[0] ) for i in range( nump ): ix = bad[1][i] # type: int iy = bad[0][i] # type: int if (iy > 0) and (iy < maxy): self.l1image[iy, ix] = 0.5 * (self.l1image[iy - 1, ix] + self.l1image[iy + 1, ix]) # Interpolate across the pixel self.l1error[iy, ix] = self.l1error[iy - 1, ix] + self.l1error[iy + 1, ix] else: nexty = 1 if (iy == 0) else maxy-1 self.l1image[iy, ix] = self.l1image[nexty, ix] self.l1error[iy, ix] = self.l1error[nexty, ix] return True
#------------------------------------------------------------------------------ # interpolate_big_pixels #------------------------------------------------------------------------------
[docs] def interpolate_big_pixels(self): ix0 = self.sub_window[0]-1 iy0 = self.sub_window[2]-1 maxy = self.l1image.shape[0] maxx = self.l1image.shape[1] for index in self.standard_big_badpixels: xl = index[0] - ix0 -1 xr = index[1] - ix0 #-1+1 yb = index[2] - iy0 -1 yt = index[3] - iy0 #-1+1 inwindow = (xl > 0) and (xr < maxx) and ( yb > 0) and (yt < maxy) if inwindow: y = np.arange(yb,yt) for i in range(xl,xr): self.l1image[yb:yt, i] = np.interp( y, [yb-1, yt], [self.l1image[yb-1,i], self.l1image[yt,i]] ) self.l1error[yb:yt, i] = self.l1error[yb-1,i] + self.l1error[yt,i] else: if (xl >= 0) and (xr <= maxx) and (yb >= 0) and (yt <= maxy): logging.warning('interpolate_big_pixels, cannot handle big pixels that lie on the boundary of the sub-window' ) return True
#------------------------------------------------------------------------------ # interpolate_bad_pixels #------------------------------------------------------------------------------
[docs] def interpolate_bad_pixels(self): ok2 = self.interpolate_big_pixels() ok1 = self.interpolate_small_pixels() return ok1 and ok2
#------------------------------------------------------------------------------ # normalize_integrationtime #------------------------------------------------------------------------------
[docs] def normalize_integrationtime(self): factor = 1000000.0/self.exposure_time self.l1image *= factor self.l1error *= factor return True
#------------------------------------------------------------------------------ # process_level0_to_Level1a #------------------------------------------------------------------------------
[docs]def process_level0_to_level1a( level0_filename : str, level1_basedir : str, instrument_name, groups=None, version : Tuple[int,int,int] = None): g_logger.info('Processing <%s> from Level 0 to Level 1A directory <%s>'%(level0_filename,level1_basedir) ) print('Processing <%s> from Level 0 to Level 1A directory <%s>' % (level0_filename, level1_basedir)) if (groups is None): groups = argcommon.netcdfutils.groups_in_file(level0_filename) with L0_to_L1A_Pipeline(instrument_name, version) as pipeline: for group in groups: output = SHOW_Level1A_Output(level1_basedir, group, pipeline) gname = group numgood = 0 numbad = 0 with SHOWLevel0( instrument_name, level0_filename, gname ) as l0: n = l0.numrecords() if (n > 0): g_logger.info('Group <%s>: %d level0 records found', str(gname), n) print('Group <%s>: %d level0 records found'%( str(gname), n)) pipeline.reset_datecheck() for i in range(n): ok = pipeline.reset_pipeline( l0, i) ok = ok and pipeline.process_pipeline() if ok: numgood += 1 output.write_record(pipeline) else: numbad += 1 # print('Read in record ', i, ' of ', n, ' from group ', group) g_logger.info('Group <%s>: Total %d records processed. %d good, %d bad',str(gname), n, numgood, numbad) print('Group <%s>: %d records processed. %d good, %d bad'%(str(gname), n, numgood, numbad)) print('Finished processing groups')