Source code for showapi.level1.l1b_io

import netCDF4
import numpy as np
import datetime
from typing import  Tuple
from .showlevel1b import SHOWL1BEntry
from .l1_fileorganizer import L1_Netcdf_FileOrganizer
from show_config.show_configuration import Parameters

#------------------------------------------------------------------------------
#           SHOW_Level1B_Input
#------------------------------------------------------------------------------

[docs]class SHOW_Level1B_Input: def __init__(self, basedir: str, groupname:str, version:str = 'v000' ): self.currentfiletime = datetime.datetime( 1980,1,1 ) self.t0 = None self.fileorganizer = L1_Netcdf_FileOrganizer(basedir, groupname, 'l1b', versionstr=version)
#------------------------------------------------------------------------------ # class SHOW_Level1B_Output #------------------------------------------------------------------------------
[docs]class SHOW_Level1B_Output: def __init__(self, instrumentname : str, groupname:str, version : Tuple[int,int,int], basedir: str = None, ): self.params = Parameters(instrumentname) # type: Parameters if basedir is None: basedir = self.params.config['files']['level1b_base_directory'] self.currentfiletime = datetime.datetime( 1980,1,1 ) self.t0 = None self.fileorganizer = L1_Netcdf_FileOrganizer(basedir, groupname, 'l1b', versionstr=version) #------------------------------------------------------------------------------ # create_empty_l1a_netcdf #------------------------------------------------------------------------------
[docs] def create_empty_l1b_netcdf( self, l1b : SHOWL1BEntry ): filegrp = netCDF4.Dataset(self.fileorganizer.filename, 'w', format="NETCDF4") groupname = self.fileorganizer.groupname if ( groupname is not None and len(groupname) > 0 ): rootgrp = filegrp.createGroup(groupname) else: rootgrp = filegrp t = np.datetime64(l1b.time,'us').astype(datetime.datetime) #type: datetime.datetime self.t0 = t.replace( second = 0, microsecond = 0) s = l1b.spectrum.shape numwave = s[1] numheights = s[0] timedim = rootgrp.createDimension( "time", None) heightrowdim = rootgrp.createDimension( "heightrow", numheights) wavelengthdim = rootgrp.createDimension( "wavelengthdim", numwave) sensortempdim = rootgrp.createDimension( "sensor_names", len(l1b.sensor_names)) aircraftiwg1dim = rootgrp.createDimension( "aircraft_iwg1_dim", len(l1b.aircraft_iwg1_names)) vectordim = rootgrp.createDimension( "vector_components", 3) versiondim = rootgrp.createDimension( "version", 3) time = rootgrp.createVariable ( "time", np.float64, ("time",)) heightrow = rootgrp.createVariable ( "heightrow", np.int16, ("heightrow",)) wavelength = rootgrp.createVariable ( "wavelength", np.float64, ("time", "wavelengthdim" )) wavelengthdimv = rootgrp.createVariable ( "wavelengthdim", np.int16, ("wavelengthdim" )) aircraftiwg1dimv = rootgrp.createVariable ( "aircraft_iwg1_dim", np.int16, ("aircraft_iwg1_dim" )) exposure_time = rootgrp.createVariable ( "exposure_time", np.float64, ("time",) ) sensor_names = rootgrp.createVariable ( "sensor_names", np.str, ("sensor_names",)) temperatures = rootgrp.createVariable ( "temperatures", np.float32, ("time","sensor_names")) spectrum = rootgrp.createVariable ( "spectrum", np.float64, ("time", "heightrow", "wavelengthdim"), least_significant_digit=2, zlib=True ) phase = rootgrp.createVariable ( "phase", np.float64, ("time", "heightrow", "wavelengthdim"), least_significant_digit=2, zlib=True) error = rootgrp.createVariable ( "error", np.float64, ("time", "heightrow", "wavelengthdim"), least_significant_digit=2, zlib=True) avg_signal = rootgrp.createVariable ( "avg_signal", np.float64, ("time", "heightrow")) locationxyz = rootgrp.createVariable ( "locationxyz", np.float64, ("time", "vector_components")) pixelrow_lookxyz = rootgrp.createVariable ( "pixelrow_lookxyz", np.float64, ("time", "heightrow", "vector_components")) pixelrow_pitch_offset = rootgrp.createVariable ( "pixelrow_pitch_offset", np.float64, ("time", "heightrow") ) aircraft_iwg1_names = rootgrp.createVariable ( "aircraft_iwg1_names", np.str, ("aircraft_iwg1_dim",)) aircraft_iwg1 = rootgrp.createVariable ( "aircraft_iwg1", np.float64, ("time", "aircraft_iwg1_dim") ) aircraft_nose = rootgrp.createVariable ( "aircraft_nose", np.float64, ("time", "vector_components")) aircraft_starboard = rootgrp.createVariable ( "aircraft_starboard", np.float64, ("time", "vector_components")) aircraft_wheels = rootgrp.createVariable ( "aircraft_wheels", np.float64, ("time", "vector_components")) version = rootgrp.createVariable("version", "i2", ("version",)) time.units = 'seconds since ' + self.t0.isoformat() heightrow.units = 'Pixels' wavelength.units = "nanometers" exposure_time.units = "milli-seconds" sensor_names.units = "Celsius" temperatures.units = "Celsius" spectrum.units = "Relative Units" phase.units = "Degrees" error.units = "Relative Units" avg_signal.units = "Relative Units" locationxyz.units = "metres" pixelrow_lookxyz.units = "unit vector" pixelrow_pitch_offset.units = "degrees" aircraft_nose.units = "unit vector" aircraft_starboard.units = "unit vector" aircraft_wheels.units = "unit vector" aircraft_iwg1_names.units = "degrees" aircraft_iwg1.units = "degrees" for i in range( len(l1b.sensor_names)): sensor_names[i] = l1b.sensor_names[i] aircraft_iwg1_names[:] = np.array( ["latitude", "longitude", "altitude", "pitch", "roll", "heading"], dtype=np.str) heightrow[:] = l1b.heightrow version[:] = l1b.version wavelengthdimv[:] = np.arange( 0, numwave, 1) aircraftiwg1dimv[:] = np.arange( 0, len(l1b.aircraft_iwg1_names), 1) filegrp.close()
#------------------------------------------------------------------------------ # append_l1b_to_netcdf #------------------------------------------------------------------------------
[docs] def append_l1b_to_netcdf( self, l1b : SHOWL1BEntry): filegrp = netCDF4.Dataset(self.fileorganizer.filename, 'a', format="NETCDF4") groupname = self.fileorganizer.groupname if (groupname is not None and len(groupname) > 0): rootgrp = filegrp.createGroup(groupname) else: rootgrp = filegrp time = rootgrp.variables["time"] wavelength = rootgrp.variables["wavelength"] exposure_time = rootgrp.variables["exposure_time"] temperatures = rootgrp.variables["temperatures"] spectrum = rootgrp.variables["spectrum"] phaser = rootgrp.variables["phase"] error = rootgrp.variables["error"] avg_signal = rootgrp.variables["avg_signal"] locationxyz = rootgrp.variables["locationxyz"] pixelrow_lookxyz = rootgrp.variables["pixelrow_lookxyz"] pixelrow_pitch_offset = rootgrp.variables["pixelrow_pitch_offset"] aircraft_iwg1 = rootgrp.variables["aircraft_iwg1"] aircraft_nose = rootgrp.variables["aircraft_nose"] aircraft_starboard = rootgrp.variables["aircraft_starboard"] aircraft_wheels = rootgrp.variables["aircraft_wheels"] ioffset = time.size t = np.datetime64(l1b.time, 'us').astype(datetime.datetime) time[ioffset] = (t - self.t0).total_seconds() wavelength[ioffset,:] = l1b.wavelength exposure_time[ioffset] = l1b.exposure_time temperatures[ ioffset,:] = l1b.temperatures spectrum[ioffset, :, :] = l1b.spectrum phaser[ioffset, :, :] = l1b.phase error [ioffset, :, :] = l1b.error avg_signal[ioffset,:] = l1b.avg_signal locationxyz[ioffset,:] = l1b.locationxyz pixelrow_lookxyz[ioffset,:,:] = l1b.pixelrow_lookxyz pixelrow_pitch_offset[ioffset,:] = l1b.pixelrow_pitch_offset aircraft_iwg1[ioffset,:] = l1b.aircraft_iwg1 aircraft_nose[ioffset,:] = l1b.aircraft_nose aircraft_starboard[ioffset,:] = l1b.aircraft_starboard aircraft_wheels[ioffset,:] = l1b.aircraft_wheels filegrp.close()
#------------------------------------------------------------------------------ # write_record #------------------------------------------------------------------------------
[docs] def write_record( self, l1b : SHOWL1BEntry): newfile = self.fileorganizer.checkfiletime(l1b.time) if (newfile): self.t0 = None if (self.t0 is None): self.create_empty_l1b_netcdf( l1b) self.append_l1b_to_netcdf( l1b )