from typing import Pattern, List, Dict, Tuple, Any, NamedTuple, Union
import numpy as np
import datetime
import re
import math
import logging
import os
import os.path
from show_config.show_configuration import Show_Configuration, read_configuration, cdb_directory
from showcdb.showcdb import SHOWCDB_DarkCurrent, SHOWCDB_DCBias, SHOWCDB_NUC, SHOWCDB_FlatField
import argcommon.netcdfutils
from ..level0.showlevel0 import SHOWLevel0
from .l1a_io import SHOW_Level1A_Output
g_logger = logging.getLogger("Level 0")
#------------------------------------------------------------------------------
# class L0_to_L1A_Pipeline
#------------------------------------------------------------------------------
[docs]class L0_to_L1A_Pipeline():
EXCEPTION_FLAG_BAD_PIXEL = 0x0001
EXCEPTION_FLAG_SATURATED_PIXEL = 0x0002
#------------------------------------------------------------------------------
# L0_to_L1A_Pipeline::__init__
#------------------------------------------------------------------------------
def __init__(self, instrumentname : str, version: Tuple[int,int,int]):
self.lastutc = datetime.datetime(1980,1,1)
self.instrumentname = instrumentname # type: str
self.versionmajor = version[0]
self.versionminor = version[1]
self.versiondetail = version[2]
self.config = read_configuration( instrumentname) # type: Show_Configuration
self.l0image = None # type: np.ndarray
self.temperatures = None # type: np.ndarray
self.fpa_temp = None # type: float
self.utc = None # type: datetime.datetime
self.temp_labels = None # type: List [ str ]
self.l1image = None # type: np.ndarray
self.l1error = None # type: np.ndarray
self.standard_bad_pixels = None # type: List[ Tuple[int,int] ]
self.standard_big_badpixels= None # type: List[ Tuple[ int, int, int, int]]
self.jitter_pixels = None # type: List[ Tuple[int,int] ]
self.dcbias = None # type: SHOWCDB_DCBias
self.darkcurent = None # type: SHOWCDB_DarkCurrent
self.nuc = None # type: SHOWCDB_NUC
self.flatfield = None # type: SHOWCDB_FlatField
self.sub_window = [ self.config["iFOV"]["Pixel_column_at_left_edge_of_grating_image"],
self.config["iFOV"]["Pixel_column_at_right_edge_of_grating_image"],
self.config["iFOV"]["Pixel_row_at_bottom_of_atmospheric_image"],
self.config["iFOV"]["Pixel_row_at_top_edge_of_atmospheric_image"]] # type: Tuple[int, int, int, int]
#------------------------------------------------------------------------------
# reset_datecheck
#------------------------------------------------------------------------------
[docs] def reset_datecheck(self):
"""
Reset the internal flag used to detect data not collected at 1 Hz frame rate
:return:
"""
self.lastutc = None
#------------------------------------------------------------------------------
# open_pipeline
#------------------------------------------------------------------------------
[docs] def open_pipeline(self):
# self.dcbias = SHOWCDB_DCBias ( self.instrumentname, self.config )
self.darkcurent = SHOWCDB_DarkCurrent( self.instrumentname, self.config )
self.nuc = SHOWCDB_NUC ( self.instrumentname, self.config )
self.flatfield = SHOWCDB_FlatField ( self.instrumentname, self.config )
self.load_standard_bad_pixels( self.config)
self.load_jitter_pixels(self.config)
return self
#------------------------------------------------------------------------------
# close_pipeline
#------------------------------------------------------------------------------
[docs] def close_pipeline(self):
# if self.dcbias is not None: self.dcbias.close()
if self.darkcurent is not None: self.darkcurent.close()
if self.nuc is not None: self.nuc.close()
if self.flatfield is not None: self.flatfield.close()
self.dcbias = None
self.darkcurent = None
self.nuc = None
self.flatfield = None
#------------------------------------------------------------------------------
# __enter__ for use with the 'with' statement
#------------------------------------------------------------------------------
def __enter__(self):
self.open_pipeline()
return self
#------------------------------------------------------------------------------
# __exit__ for use with the 'with' statement
#------------------------------------------------------------------------------
def __exit__(self, type, value, traceback):
self.close_pipeline()
#------------------------------------------------------------------------------
# L0_to_L1A_Pipeline::reset_pipeline
#------------------------------------------------------------------------------
[docs] def reset_pipeline(self, l0:SHOWLevel0, i : int) -> bool:
"""
Reset the processing pipeline so it can process another image
:param l0:
:param i:
:return:
"""
self.l0image = l0.image_at(i,dtype=np.uint16)
self.temperatures = np.array(l0.temperatures[i, :])
self.fpa_temp = self.temperatures[0]
self.utc = datetime.datetime.utcfromtimestamp( l0.time[i:i+1].values[0].astype('O')/1e9)
self.exposure_time = l0.exposure_time[i:i+1].values[0]
self.temp_labels = np.array(l0.temperature_labels)
self.l1image = None
self.l1error = None
return True
#------------------------------------------------------------------------------
# load_standard_bad_pixels
#------------------------------------------------------------------------------
[docs] def load_list_of_pixels(self, filename):
"""
Load a list of pixel indices from a simple text file
:return:
"""
try:
f = open(filename, 'rt')
lines = f.readlines()
f.close()
except:
g_logger.warning("L0_to_L1A_Pipeline:load_list_of_pixels, error reading list of pixels from file %s", filename)
lines = []
pixels_list = []
ix = []
iy = []
for l in lines:
m = re.match( r'(\d+)[ ,]+(\d+)', l )
if (m is not None):
iy.append( int(m.group(1)) )
ix.append( int(m.group(2)) )
if (len(ix) > 0 ):
pixels_list.append( iy )
pixels_list.append( ix )
return pixels_list
#------------------------------------------------------------------------------
# load_standard_bad_pixels
#------------------------------------------------------------------------------
[docs] def load_standard_bad_pixels(self, config):
"""
Load the standard bad pixels from the calibration dtaabase. This is just a simple text file
:return:
"""
self.standard_bad_pixels = config['badpixels']['small']
self.standard_big_badpixels = config['badpixels']['big']
#------------------------------------------------------------------------------
# load_jitter_pixels
#------------------------------------------------------------------------------
[docs] def load_jitter_pixels(self, config):
"""
Load the standard bad pixels from the calibration dtaabase. This is just a simple text file
:return:
"""
filename = cdb_directory(config)
self.jitter_pixels = [] #self.load_list_of_pixels( filename)
#------------------------------------------------------------------------------
# process_pipeline
#------------------------------------------------------------------------------
[docs] def process_pipeline(self) -> bool:
"""
Apply the Level 0 to Level 1A pipeline to the current level 0 image
:return:
"""
ok = True
ok = ok and self.mask_bits_14_and_15()
ok = ok and self.check_normal_mode()
ok = ok and self.initialize_readout_error()
# ok = ok and self.image_jitter_validation()
ok = ok and self.remove_dark_current()
ok = ok and self.apply_shot_noise()
ok = ok and self.mask_saturated_and_bad_pixels()
ok = ok and self.sub_window_selection()
ok = ok and self.nuc_correction()
ok = ok and self.flat_field_correction()
ok = ok and self.interpolate_bad_pixels()
ok = ok and self.sqrt_error()
ok = ok and self.normalize_integrationtime()
# if (not ok):
# g_logger.warning("L0_to_L1A_Pipeline:process_pipeline, rejecting record collected at %s", self.utc.strftime('%Y-%m-%d %H:%M:%S.%f') )
return ok
#------------------------------------------------------------------------------
# mask_bits_14_and_15
#------------------------------------------------------------------------------
[docs] def mask_bits_14_and_15(self):
self.l1image = self.l0image
self.l1image &= 0x3FFF
self.l1image = self.l1image.astype( np.float )
return True
#------------------------------------------------------------------------------
# mask_bits_14_and_15
#------------------------------------------------------------------------------
[docs] def check_normal_mode(self):
if (self.lastutc is not None):
tmin = datetime.timedelta(milliseconds = 800)
tmax = datetime.timedelta(seconds=1, milliseconds=200)
dt = self.utc - self.lastutc
ok1 = (dt > tmin) and (dt < tmax)
if not ok1:
g_logger.warning('Image at %s, the time gap (%s) between successive images was outside the range 0.8 to 1.2 seconds, data are accepted but note that current calibration code only supports 1 Hz acquisition rate', str(self.utc), str(dt))
# print( 'Image at %s, the time gap (%s) between successive images was outside the range 0.8 to 1.2 seconds, code currently only support 1 Hz'%( str(self.utc), str(dt)))
self.lastutc = self.utc
minval = np.min(self.l0image)
ok = (minval > 1500)
if not ok:
g_logger.warning(" Image at %s, L0_to_L1A_Pipeline:check_normal_mode failed as the minval, %d, is less than 1500", str(self.utc), int(minval))
return ok
#------------------------------------------------------------------------------
# initialize_readout_error
#------------------------------------------------------------------------------
[docs] def initialize_readout_error(self):
electron_noise = self.config["cdb"]["detector_readout_noise"]
Ne = self.config["cdb"]["electrons_per_DN"]
err = electron_noise/Ne
self.l1error = np.zeros( self.l1image.shape ) + (err*err) # get the square of the error.
return True
#------------------------------------------------------------------------------
# image_jitter_validation
#------------------------------------------------------------------------------
[docs] def image_jitter_validation(self):
#print("jitter test not yet implemented")
return True
#------------------------------------------------------------------------------
# mask_saturated_and_bad_pixels
#------------------------------------------------------------------------------
[docs] def mask_saturated_and_bad_pixels(self):
"""
sets known bad pixels to Nan. Bad pixels are read in from the calibration database. Note they are 1 based indices.
:return:
"""
maxy = self.l1image.shape[0] - 1
for index in self.standard_bad_pixels:
ix = index[0] -1 # type: int
iy = index[1] -1 # type: int
self.l1image[ iy,ix ] = math.nan
for index in self.standard_big_badpixels:
ix0 = index[0] - 1 # type: int
ix1 = index[1]
iy0 = index[2] - 1 # type: int
iy1 = index[3]
self.l1image[iy0:iy1, ix0:ix1] = math.nan
saturatedpix = np.where( self.l1image >= self.config['cdb']['dn_saturated_value'])
self.l1image[saturatedpix] = math.nan
return True
#------------------------------------------------------------------------------
# apply_shot_noise
#------------------------------------------------------------------------------
[docs] def apply_shot_noise(self):
Ne = self.config["cdb"]["electrons_per_DN"]
self.l1error += self.l1image/Ne
return True
#------------------------------------------------------------------------------
# remove_dark_current
#------------------------------------------------------------------------------
[docs] def remove_dark_current(self):
dark, error, ok = self.darkcurent.fetch_dark_current( self.exposure_time, self.fpa_temp, self.utc)
if (ok):
self.l1image -= dark
self.l1error += (error*error)
return ok
#------------------------------------------------------------------------------
# sub_windowed_array
#------------------------------------------------------------------------------
[docs] def sub_windowed_array(self, userarray:np.ndarray):
ix0 = self.sub_window[0]-1
ix1 = self.sub_window[1] #-1+1
iy0 = self.sub_window[2]-1
iy1 = self.sub_window[3] #-1+1
if userarray.ndim == 1:
sarray = userarray[ iy0:iy1]
else:
sarray = userarray[ iy0:iy1, ix0:ix1 ]
return sarray
#------------------------------------------------------------------------------
# sub_window_selection
#------------------------------------------------------------------------------
[docs] def sub_window_selection(self):
self.l1image = self.sub_windowed_array( self.l1image )
self.l1error = self.sub_windowed_array( self.l1error )
return True
#------------------------------------------------------------------------------
# nuc_correction
#------------------------------------------------------------------------------
[docs] def nuc_correction(self):
nuc, error,ok = self.nuc.fetch_nuc( self.fpa_temp, self.utc)
if (ok):
nuc = self.sub_windowed_array(nuc)
error = self.sub_windowed_array( error)
y = 1.0/nuc # Get the inverse of the NUC
dy = error/(nuc*nuc) # Get the error in the inverse of the NUC
self.l1image *= y # apply the NUC correction
self.l1error *= (y*y) # and apply the (square of the) error propagation
self.l1error += self.l1image*(dy*dy) # do the full error propagation
return ok
#------------------------------------------------------------------------------
# flat_field_correction
#------------------------------------------------------------------------------
[docs] def flat_field_correction(self):
ff, error,ok = self.flatfield.fetch_flat_field( self.fpa_temp, self.utc)
if (ok):
ff = self.sub_windowed_array(ff)
error = self.sub_windowed_array(error)
y = 1.0/ff # Get the inverse of the flat-field
dy = error/(ff*ff) # Get the error in the inverse of the flat field
self.l1image *= y # apply the flat-field correction
self.l1error *= (y*y) # and apply the (square of the) error propagation
self.l1error += self.l1image*(dy*dy) # do the full error propagation
return ok
#------------------------------------------------------------------------------
# sqrt_error
#------------------------------------------------------------------------------
[docs] def sqrt_error(self):
self.l1error = np.sqrt( self.l1error)
bad = np.where( np.isnan(self.l1error) )
if len(bad[0]) > 0:
print('There are bad error values in the sub-windowed results', bad)
return True
#------------------------------------------------------------------------------
# interpolate_small_pixels
#------------------------------------------------------------------------------
[docs] def interpolate_small_pixels(self):
ix0 = self.sub_window[0]-1
iy0 = self.sub_window[2]-1
maxy = self.l1image.shape[0] - 1
bad = np.where( np.isnan(self.l1image) )
nump = len( bad[0] )
for i in range( nump ):
ix = bad[1][i] # type: int
iy = bad[0][i] # type: int
if (iy > 0) and (iy < maxy):
self.l1image[iy, ix] = 0.5 * (self.l1image[iy - 1, ix] + self.l1image[iy + 1, ix]) # Interpolate across the pixel
self.l1error[iy, ix] = self.l1error[iy - 1, ix] + self.l1error[iy + 1, ix]
else:
nexty = 1 if (iy == 0) else maxy-1
self.l1image[iy, ix] = self.l1image[nexty, ix]
self.l1error[iy, ix] = self.l1error[nexty, ix]
return True
#------------------------------------------------------------------------------
# interpolate_big_pixels
#------------------------------------------------------------------------------
[docs] def interpolate_big_pixels(self):
ix0 = self.sub_window[0]-1
iy0 = self.sub_window[2]-1
maxy = self.l1image.shape[0]
maxx = self.l1image.shape[1]
for index in self.standard_big_badpixels:
xl = index[0] - ix0 -1
xr = index[1] - ix0 #-1+1
yb = index[2] - iy0 -1
yt = index[3] - iy0 #-1+1
inwindow = (xl > 0) and (xr < maxx) and ( yb > 0) and (yt < maxy)
if inwindow:
y = np.arange(yb,yt)
for i in range(xl,xr):
self.l1image[yb:yt, i] = np.interp( y, [yb-1, yt], [self.l1image[yb-1,i], self.l1image[yt,i]] )
self.l1error[yb:yt, i] = self.l1error[yb-1,i] + self.l1error[yt,i]
else:
if (xl >= 0) and (xr <= maxx) and (yb >= 0) and (yt <= maxy):
logging.warning('interpolate_big_pixels, cannot handle big pixels that lie on the boundary of the sub-window' )
return True
#------------------------------------------------------------------------------
# interpolate_bad_pixels
#------------------------------------------------------------------------------
[docs] def interpolate_bad_pixels(self):
ok2 = self.interpolate_big_pixels()
ok1 = self.interpolate_small_pixels()
return ok1 and ok2
#------------------------------------------------------------------------------
# normalize_integrationtime
#------------------------------------------------------------------------------
[docs] def normalize_integrationtime(self):
factor = 1000000.0/self.exposure_time
self.l1image *= factor
self.l1error *= factor
return True
#------------------------------------------------------------------------------
# process_level0_to_Level1a
#------------------------------------------------------------------------------
[docs]def process_level0_to_level1a( level0_filename : str, level1_basedir : str, instrument_name, groups=None, version : Tuple[int,int,int] = None):
g_logger.info('Processing <%s> from Level 0 to Level 1A directory <%s>'%(level0_filename,level1_basedir) )
print('Processing <%s> from Level 0 to Level 1A directory <%s>' % (level0_filename, level1_basedir))
if (groups is None):
groups = argcommon.netcdfutils.groups_in_file(level0_filename)
with L0_to_L1A_Pipeline(instrument_name, version) as pipeline:
for group in groups:
output = SHOW_Level1A_Output(level1_basedir, group, pipeline)
gname = group
numgood = 0
numbad = 0
with SHOWLevel0( instrument_name, level0_filename, gname ) as l0:
n = l0.numrecords()
if (n > 0):
g_logger.info('Group <%s>: %d level0 records found', str(gname), n)
print('Group <%s>: %d level0 records found'%( str(gname), n))
pipeline.reset_datecheck()
for i in range(n):
ok = pipeline.reset_pipeline( l0, i)
ok = ok and pipeline.process_pipeline()
if ok:
numgood += 1
output.write_record(pipeline)
else: numbad += 1
# print('Read in record ', i, ' of ', n, ' from group ', group)
g_logger.info('Group <%s>: Total %d records processed. %d good, %d bad',str(gname), n, numgood, numbad)
print('Group <%s>: %d records processed. %d good, %d bad'%(str(gname), n, numgood, numbad))
print('Finished processing groups')