Source code for showapi.level1.l1a_to_l1b_pipeline

from typing import Pattern, List, Dict, Tuple, Any, NamedTuple, Union
import numpy as np
import datetime
import re
import math
import logging
import os
import os.path
from show_config.show_configuration import Parameters
from showcdb.showcdb import SHOWCDB_TransmissionFilter
import argcommon.netcdfutils
from .showlevel1a import SHOWL1AEntry, SHOWLevel1ACollection
from .showlevel1b import SHOWL1BEntry
from showattitude.iwg1 import IWG1Collection
from .l1b_io import SHOW_Level1B_Output
from ..algorithms.l1b_algorithms import rms_frequency_error_from_rms_spatial_error, interferogram_to_spectrum


g_logger = logging.getLogger("Level 1")

#------------------------------------------------------------------------------
#           class L0_to_L1A_Pipeline
#------------------------------------------------------------------------------

[docs]class L1A_to_L1B_Pipeline(): #------------------------------------------------------------------------------ # SHOW_L0_to_L1A::__init__ #------------------------------------------------------------------------------ def __init__(self, instrumentname : str, iwg1_filename: str, version : Tuple[int,int,int]): self.instrumentname = instrumentname # type: str self.parameters = Parameters( instrumentname) # type: Parameters self.iwg1collection = IWG1Collection ( filename=iwg1_filename ) # type: IWG1Collection self.l1a = None # type: SHOWL1AEntry self.version = version #------------------------------------------------------------------------------ # open_pipeline #------------------------------------------------------------------------------
[docs] def open_pipeline(self): self.filter = SHOWCDB_TransmissionFilter ( self.instrumentname, self.parameters.config ) return self
#------------------------------------------------------------------------------ # close_pipeline #------------------------------------------------------------------------------
[docs] def close_pipeline(self): if self.filter is not None: self.filter.close() self.filter = None
#------------------------------------------------------------------------------ # __enter__ for use with the 'with' statement #------------------------------------------------------------------------------ def __enter__(self): self.open_pipeline() return self #------------------------------------------------------------------------------ # __exit__ for use with the 'with' statement #------------------------------------------------------------------------------ def __exit__(self, type, value, traceback): self.close_pipeline() #------------------------------------------------------------------------------ # SHOW_L0_to_L1A::reset_pipeline #------------------------------------------------------------------------------
[docs] def reset_pipeline(self, l1a:SHOWL1AEntry) -> bool: """ Reset the procesisng pipeline so it can process another image :param l0: :param i: :return: """ self.iwg1entry = self.iwg1collection.interpolate(l1a.time, extrapolate=False, truncate=False, maxgap_seconds=5.0) self.l1a = l1a self.time = l1a.time self.heightrow = l1a.heightrow self.wavelength = None self.exposure_time = l1a.exposure_time self.temperatures = l1a.temperatures self.spectrum = None self.phase = None self.error = None self.avg_signal = None self.locationxyz = None self.pixelrow_lookxyz = None self.pixelrow_pitch_offset = None return True
#------------------------------------------------------------------------------ # process_pipeline #------------------------------------------------------------------------------
[docs] def process_pipeline(self) -> bool: """ Apply the Level 0 to Level 1A pipeline to the current level 0 image :return: """ ok = True ok = ok and self.attitude_solution() ok = ok and self.make_total_power() ok = ok and self.interferogram_to_spectrum() ok = ok and self.generate_wavelength_scale() ok = ok and self.apply_filter_transmission() # if (not ok): # g_logger.warning("SHOW_L0_to_L1A:process_pipeline, rejecting record collected at %s", self.utc.strftime('%Y-%m-%d %H:%M:%S.%f') ) return ok
# ------------------------------------------------------------------------------ # attitude_solution # ------------------------------------------------------------------------------
[docs] def attitude_solution(self): numheights = self.l1a.image.shape[0] self.locationxyz = np.zeros([3]) self.pixelrow_lookxyz = np.zeros([numheights,3]) self.pixelrow_pitch_offset = np.zeros([numheights]) showpitchangle = self.parameters.config["cdb"]["boresight_pitchangle"] boresightpix = self.parameters.config["cdb"]["bore_sight_pixel"] degreesperpix = self.parameters.config["cdb"]["height_degrees_per_pixel"] pitch = (self.heightrow - boresightpix)*degreesperpix + showpitchangle pitchrad = np.reshape( np.deg2rad(pitch), [pitch.size,1]) x = np.reshape( self.iwg1entry.nose, [1,3] ) z = -np.reshape(self.iwg1entry.down, [1,3] ) self.locationxyz[:] = self.iwg1entry.location self.pixelrow_lookxyz[:,:] = np.cos( pitchrad )*x + np.cos( pitchrad )*z self.pixelrow_pitch_offset[:] = pitch return True
#------------------------------------------------------------------------------ # make_total_power #------------------------------------------------------------------------------
[docs] def make_total_power(self): self.avg_signal = np.average( self.l1a.image, axis= 1) return True
#------------------------------------------------------------------------------ # generate_wavelength_scale #------------------------------------------------------------------------------
[docs] def generate_wavelength_scale(self): """ Generates the wavelength scale. This must occur after the spectrum has been generated :return: """ s = self.spectrum.shape # Get the shape of the spectrum nw = s[1] # get the number of wavelengths/frequencies shs_temp = self.temperatures[2] # Get the temperature of the SHS self.wavelength = self.parameters.nominal_wavelengths(nw, shs_temperature_celsius=shs_temp) # Get the nominal wavelength scale return True
#------------------------------------------------------------------------------ # interferogram_to_spectrum #------------------------------------------------------------------------------
[docs] def interferogram_to_spectrum(self): """ Converts the interferogram and error into a spectrum and error :return: """ self.spectrum, self.error, self.phase, ifgramh = interferogram_to_spectrum(self.l1a.image, self.l1a.error) # use the function in the l1b_algorithms module return True
#------------------------------------------------------------------------------ # image_jitter_validation #------------------------------------------------------------------------------
[docs] def apply_filter_transmission(self): filter,error,ok = self.filter.fetch_filter( ) if (ok): filter = filter[ self.heightrow, :] error = error [ self.heightrow, :] self.spectrum /= filter self.error /= filter else: g_logger.warning('Error applying transmission filter correction. Could not find filter calibration file') return ok
#------------------------------------------------------------------------------ # Level1B #------------------------------------------------------------------------------
[docs] def Level1B(self): L1BEntry = SHOWL1BEntry( time = self.time, heightrow = self.heightrow, wavelength = self.wavelength, exposure_time = self.exposure_time, temperatures = self.temperatures, sensor_names = self.l1a.sensor_names, spectrum = self.spectrum, phase = self.phase, error = self.error, avg_signal = self.avg_signal, locationxyz = self.locationxyz, pixelrow_lookxyz = self.pixelrow_lookxyz, pixelrow_pitch_offset = self.pixelrow_pitch_offset, aircraft_iwg1_names = np.array( ["latitude", "longitude", "altitude", "pitch", "roll", "heading"], dtype=np.str), aircraft_iwg1 = np.array( [self.iwg1entry.latitude, self.iwg1entry.longitude, self.iwg1entry.altitude, self.iwg1entry.pitch, self.iwg1entry.roll, self.iwg1entry.heading]), aircraft_nose = self.iwg1entry.nose, aircraft_starboard = self.iwg1entry.starboard, aircraft_wheels = self.iwg1entry.wheels, version = self.version ) return L1BEntry
#------------------------------------------------------------------------------ # process_level0_to_Level1a #------------------------------------------------------------------------------
[docs]def process_level1a_to_level1b( instrument_name : str, group : str, starttime : datetime.datetime, endtime : datetime.datetime, iwg1_filename : str, level1b_basedir : str = None, level1a_basedir : str = None, level1a_version : str = None, level1b_version : Tuple[int,int,int] = None): g_logger.info('Processing records from Level 1A to Level 1B for group %s from %s to %s'%( str(group), str(starttime), str(endtime)) ) print('Processing records from Level 1A to Level 1B for group %s from %s to %s'%( str(group), str(starttime), str(endtime))) with L1A_to_L1B_Pipeline(instrument_name, iwg1_filename, level1b_version) as pipeline: output = SHOW_Level1B_Output( instrument_name, group, level1b_version, basedir= level1b_basedir) gname = group numgood = 0 numbad = 0 with SHOWLevel1ACollection( instrument_name, group, basedir=level1a_basedir, versionstr=level1a_version ) as l1a_collection: l1a_collection.load( starttime, endtime) n = l1a_collection.numrecords() if (n > 0): g_logger.info('Group <%s>: %d level 1A records found', str(gname), n) print('Group <%s>: %d level1A records found'%( str(gname), n)) for l1a in l1a_collection: ok = pipeline.reset_pipeline( l1a ) ok = ok and pipeline.process_pipeline() if ok: numgood += 1 output.write_record( pipeline.Level1B()) else: numbad += 1 # print('Read in record ', i, ' of ', n, ' from group ', group) g_logger.info('Group <%s>: Total %d records processed. %d good, %d bad',str(gname), n, numgood, numbad) print('Group <%s>: %d records processed. %d good, %d bad'%(str(gname), n, numgood, numbad)) print('Finished processing groups')