from typing import Pattern, List, Dict, Tuple, Any, NamedTuple, Union
import numpy as np
import datetime
import re
import math
import logging
import os
import os.path
from show_config.show_configuration import Parameters
from showcdb.showcdb import SHOWCDB_TransmissionFilter
import argcommon.netcdfutils
from .showlevel1a import SHOWL1AEntry, SHOWLevel1ACollection
from .showlevel1b import SHOWL1BEntry
from showattitude.iwg1 import IWG1Collection
from .l1b_io import SHOW_Level1B_Output
from ..algorithms.l1b_algorithms import rms_frequency_error_from_rms_spatial_error, interferogram_to_spectrum
g_logger = logging.getLogger("Level 1")
#------------------------------------------------------------------------------
# class L0_to_L1A_Pipeline
#------------------------------------------------------------------------------
[docs]class L1A_to_L1B_Pipeline():
#------------------------------------------------------------------------------
# SHOW_L0_to_L1A::__init__
#------------------------------------------------------------------------------
def __init__(self, instrumentname : str, iwg1_filename: str, version : Tuple[int,int,int]):
self.instrumentname = instrumentname # type: str
self.parameters = Parameters( instrumentname) # type: Parameters
self.iwg1collection = IWG1Collection ( filename=iwg1_filename ) # type: IWG1Collection
self.l1a = None # type: SHOWL1AEntry
self.version = version
#------------------------------------------------------------------------------
# open_pipeline
#------------------------------------------------------------------------------
[docs] def open_pipeline(self):
self.filter = SHOWCDB_TransmissionFilter ( self.instrumentname, self.parameters.config )
return self
#------------------------------------------------------------------------------
# close_pipeline
#------------------------------------------------------------------------------
[docs] def close_pipeline(self):
if self.filter is not None: self.filter.close()
self.filter = None
#------------------------------------------------------------------------------
# __enter__ for use with the 'with' statement
#------------------------------------------------------------------------------
def __enter__(self):
self.open_pipeline()
return self
#------------------------------------------------------------------------------
# __exit__ for use with the 'with' statement
#------------------------------------------------------------------------------
def __exit__(self, type, value, traceback):
self.close_pipeline()
#------------------------------------------------------------------------------
# SHOW_L0_to_L1A::reset_pipeline
#------------------------------------------------------------------------------
[docs] def reset_pipeline(self, l1a:SHOWL1AEntry) -> bool:
"""
Reset the procesisng pipeline so it can process another image
:param l0:
:param i:
:return:
"""
self.iwg1entry = self.iwg1collection.interpolate(l1a.time, extrapolate=False, truncate=False, maxgap_seconds=5.0)
self.l1a = l1a
self.time = l1a.time
self.heightrow = l1a.heightrow
self.wavelength = None
self.exposure_time = l1a.exposure_time
self.temperatures = l1a.temperatures
self.spectrum = None
self.phase = None
self.error = None
self.avg_signal = None
self.locationxyz = None
self.pixelrow_lookxyz = None
self.pixelrow_pitch_offset = None
return True
#------------------------------------------------------------------------------
# process_pipeline
#------------------------------------------------------------------------------
[docs] def process_pipeline(self) -> bool:
"""
Apply the Level 0 to Level 1A pipeline to the current level 0 image
:return:
"""
ok = True
ok = ok and self.attitude_solution()
ok = ok and self.make_total_power()
ok = ok and self.interferogram_to_spectrum()
ok = ok and self.generate_wavelength_scale()
ok = ok and self.apply_filter_transmission()
# if (not ok):
# g_logger.warning("SHOW_L0_to_L1A:process_pipeline, rejecting record collected at %s", self.utc.strftime('%Y-%m-%d %H:%M:%S.%f') )
return ok
# ------------------------------------------------------------------------------
# attitude_solution
# ------------------------------------------------------------------------------
[docs] def attitude_solution(self):
numheights = self.l1a.image.shape[0]
self.locationxyz = np.zeros([3])
self.pixelrow_lookxyz = np.zeros([numheights,3])
self.pixelrow_pitch_offset = np.zeros([numheights])
showpitchangle = self.parameters.config["cdb"]["boresight_pitchangle"]
boresightpix = self.parameters.config["cdb"]["bore_sight_pixel"]
degreesperpix = self.parameters.config["cdb"]["height_degrees_per_pixel"]
pitch = (self.heightrow - boresightpix)*degreesperpix + showpitchangle
pitchrad = np.reshape( np.deg2rad(pitch), [pitch.size,1])
x = np.reshape( self.iwg1entry.nose, [1,3] )
z = -np.reshape(self.iwg1entry.down, [1,3] )
self.locationxyz[:] = self.iwg1entry.location
self.pixelrow_lookxyz[:,:] = np.cos( pitchrad )*x + np.cos( pitchrad )*z
self.pixelrow_pitch_offset[:] = pitch
return True
#------------------------------------------------------------------------------
# make_total_power
#------------------------------------------------------------------------------
[docs] def make_total_power(self):
self.avg_signal = np.average( self.l1a.image, axis= 1)
return True
#------------------------------------------------------------------------------
# generate_wavelength_scale
#------------------------------------------------------------------------------
[docs] def generate_wavelength_scale(self):
"""
Generates the wavelength scale. This must occur after the spectrum has been generated
:return:
"""
s = self.spectrum.shape # Get the shape of the spectrum
nw = s[1] # get the number of wavelengths/frequencies
shs_temp = self.temperatures[2] # Get the temperature of the SHS
self.wavelength = self.parameters.nominal_wavelengths(nw, shs_temperature_celsius=shs_temp) # Get the nominal wavelength scale
return True
#------------------------------------------------------------------------------
# interferogram_to_spectrum
#------------------------------------------------------------------------------
[docs] def interferogram_to_spectrum(self):
"""
Converts the interferogram and error into a spectrum and error
:return:
"""
self.spectrum, self.error, self.phase, ifgramh = interferogram_to_spectrum(self.l1a.image, self.l1a.error) # use the function in the l1b_algorithms module
return True
#------------------------------------------------------------------------------
# image_jitter_validation
#------------------------------------------------------------------------------
[docs] def apply_filter_transmission(self):
filter,error,ok = self.filter.fetch_filter( )
if (ok):
filter = filter[ self.heightrow, :]
error = error [ self.heightrow, :]
self.spectrum /= filter
self.error /= filter
else:
g_logger.warning('Error applying transmission filter correction. Could not find filter calibration file')
return ok
#------------------------------------------------------------------------------
# Level1B
#------------------------------------------------------------------------------
[docs] def Level1B(self):
L1BEntry = SHOWL1BEntry( time = self.time,
heightrow = self.heightrow,
wavelength = self.wavelength,
exposure_time = self.exposure_time,
temperatures = self.temperatures,
sensor_names = self.l1a.sensor_names,
spectrum = self.spectrum,
phase = self.phase,
error = self.error,
avg_signal = self.avg_signal,
locationxyz = self.locationxyz,
pixelrow_lookxyz = self.pixelrow_lookxyz,
pixelrow_pitch_offset = self.pixelrow_pitch_offset,
aircraft_iwg1_names = np.array( ["latitude", "longitude", "altitude", "pitch", "roll", "heading"], dtype=np.str),
aircraft_iwg1 = np.array( [self.iwg1entry.latitude, self.iwg1entry.longitude, self.iwg1entry.altitude, self.iwg1entry.pitch, self.iwg1entry.roll, self.iwg1entry.heading]),
aircraft_nose = self.iwg1entry.nose,
aircraft_starboard = self.iwg1entry.starboard,
aircraft_wheels = self.iwg1entry.wheels,
version = self.version
)
return L1BEntry
#------------------------------------------------------------------------------
# process_level0_to_Level1a
#------------------------------------------------------------------------------
[docs]def process_level1a_to_level1b( instrument_name : str,
group : str,
starttime : datetime.datetime,
endtime : datetime.datetime,
iwg1_filename : str,
level1b_basedir : str = None,
level1a_basedir : str = None,
level1a_version : str = None,
level1b_version : Tuple[int,int,int] = None):
g_logger.info('Processing records from Level 1A to Level 1B for group %s from %s to %s'%( str(group), str(starttime), str(endtime)) )
print('Processing records from Level 1A to Level 1B for group %s from %s to %s'%( str(group), str(starttime), str(endtime)))
with L1A_to_L1B_Pipeline(instrument_name, iwg1_filename, level1b_version) as pipeline:
output = SHOW_Level1B_Output( instrument_name, group, level1b_version, basedir= level1b_basedir)
gname = group
numgood = 0
numbad = 0
with SHOWLevel1ACollection( instrument_name, group, basedir=level1a_basedir, versionstr=level1a_version ) as l1a_collection:
l1a_collection.load( starttime, endtime)
n = l1a_collection.numrecords()
if (n > 0):
g_logger.info('Group <%s>: %d level 1A records found', str(gname), n)
print('Group <%s>: %d level1A records found'%( str(gname), n))
for l1a in l1a_collection:
ok = pipeline.reset_pipeline( l1a )
ok = ok and pipeline.process_pipeline()
if ok:
numgood += 1
output.write_record( pipeline.Level1B())
else: numbad += 1
# print('Read in record ', i, ' of ', n, ' from group ', group)
g_logger.info('Group <%s>: Total %d records processed. %d good, %d bad',str(gname), n, numgood, numbad)
print('Group <%s>: %d records processed. %d good, %d bad'%(str(gname), n, numgood, numbad))
print('Finished processing groups')