Source code for showapi.level1.l1a_io

import math
import os
import os.path
import netCDF4
import numpy as np
from typing import  Dict,List
import datetime
import argcommon.mjd

from .l1_fileorganizer import L1_Netcdf_FileOrganizer

#------------------------------------------------------------------------------
#          class SHOW_Level1A_Output
#------------------------------------------------------------------------------

[docs]class SHOW_Level1A_Output: def __init__(self, basedir: str, groupname:str, pipeline : 'L0_to_L1A_Pipeline' ): self.versionmajor = pipeline.versionmajor self.versionminor = pipeline.versionminor self.versiondetail = pipeline.versiondetail versionstr ='%02d.%02d.%02d'%(self.versionmajor,self.versionminor,self.versiondetail) self.currentfiletime = datetime.datetime( 1980,1,1 ) self.mjd0 = None self.fileorganizer = L1_Netcdf_FileOrganizer(basedir, groupname, 'l1a', versionstr=versionstr) #------------------------------------------------------------------------------ # create_empty_l1a_netcdf #------------------------------------------------------------------------------
[docs] def create_empty_l1a_netcdf( self, data : 'L0_to_L1A_Pipeline' ): filegrp = netCDF4.Dataset(self.fileorganizer.filename, 'w', format="NETCDF4") groupname = self.fileorganizer.groupname if ( groupname is not None and len(groupname) > 0 ): rootgrp = filegrp.createGroup(groupname) else: rootgrp = filegrp mjd = argcommon.mjd.MJD() mjd.from_datetime( data.utc ) minmjd = mjd.MJD() self.mjd0 = math.floor( minmjd*86400.0)/86400.0 ix0, ix1, iy0,iy1 = data.sub_window # sub window using 1 based indexing heightrowdim = rootgrp.createDimension( "heightrow",iy1-iy0+1) pixelrowdim = rootgrp.createDimension( "pixelcolumn", ix1-ix0+1) versiondim = rootgrp.createDimension( "version", 3) timedim = rootgrp.createDimension( "time", None) sensortempdim = rootgrp.createDimension( "sensor_names",len(data.temp_labels)) times = rootgrp.createVariable ( "time", "f8", ("time",)) heightrow = rootgrp.createVariable ( "heightrow", "i2", ("heightrow",)) version = rootgrp.createVariable ( "version", "i2", ("version",)) pixelcolumn = rootgrp.createVariable ( "pixelcolumn", "i2", ("pixelcolumn", )) sensor_names = rootgrp.createVariable ( "sensor_names", "str",("sensor_names",)) exposure_time = rootgrp.createVariable ( "exposure_time", "f8", ("time",)) temperatures = rootgrp.createVariable ( "temperatures", "f4", ("time","sensor_names")) image = rootgrp.createVariable ( "image", "f8", ("time", "heightrow", "pixelcolumn") , least_significant_digit=2, zlib=True ) error = rootgrp.createVariable ( "error", "f8", ("time", "heightrow", "pixelcolumn") , least_significant_digit=2, zlib=True) # flags = rootgrp.createVariable ( "flags", "i4", ("time", "heightrow"), zlib=True) mjdobj = argcommon.mjd.MJD( self.mjd0 ) times.units = 'seconds since ' + mjdobj.AsUTCStr(domicrosecs=True) image.units = "A/D Digital Numbers (DN)" error.units = "A/D Digital Numbers (DN)" exposure_time.units = "milli-seconds" temperatures.units = "Celsius" sensor_names.units = "Celsius" heightrow.units = "Pixels" pixelcolumn.units = "Pixels" for i in range( len(data.temp_labels)): sensor_names[i] = data.temp_labels[i] heightrow[:] = np.arange(iy0-1,iy1).astype('i2') pixelcolumn [:] = np.arange(ix0-1,ix1).astype('i2') version[:] = [ self.versionmajor, self.versionminor, self.versiondetail] filegrp.close()
#------------------------------------------------------------------------------ # append_l1a_to_netcdf #------------------------------------------------------------------------------
[docs] def append_l1a_to_netcdf( self, data : 'L0_to_L1A_Pipeline'): filegrp = netCDF4.Dataset(self.fileorganizer.filename, 'a', format="NETCDF4") groupname = self.fileorganizer.groupname if (groupname is not None and len(groupname) > 0): rootgrp = filegrp.createGroup(groupname) else: rootgrp = filegrp times = rootgrp.variables["time"] exposure_time = rootgrp.variables["exposure_time"] temperatures = rootgrp.variables["temperatures"] image = rootgrp.variables["image"] error = rootgrp.variables["error"] # flags = rootgrp.variables["flags"] ioffset = times.size mjd = argcommon.mjd.MJD() mjd.from_datetime( data.utc ) times[ioffset] = (mjd.MJD() - self.mjd0)*86400.0 exposure_time[ioffset] = data.exposure_time/1000.0 for i in range( len(data.temperatures) ): temperatures[ ioffset] = data.temperatures[i] image[ioffset, :, :] = data.l1image error[ioffset, :, :] = data.l1error # flags[ioffset, :] = data.l1flags filegrp.close()
#------------------------------------------------------------------------------ # write_record #------------------------------------------------------------------------------
[docs] def write_record( self, data : 'L0_to_L1A_Pipeline'): newfile = self.fileorganizer.checkfiletime(data.utc) if (newfile): self.mjd0 = None if (self.mjd0 is None): self.create_empty_l1a_netcdf( data) self.append_l1a_to_netcdf( data )