Source code for showapi.level1.l1_fileorganizer

import numpy as np
import datetime
import dateutil.parser
import xarray
import os
import os.path
import re
import numpy as np
from typing import Tuple, Union

#------------------------------------------------------------------------------
#           class L1_Netcdf_FileOrganizer
#------------------------------------------------------------------------------

[docs]class L1_Netcdf_FileOrganizer(): def __init__(self, basedir : str, groupname:str, levelname : str, versionstr : Union[str, Tuple[int,int,int]]): """ :param basedir: Basedirectory to store :param groupname: Groupname with :param levelname: l1a, l1b or similar :param versionstr: """ if type(versionstr) == str: fields = versionstr.split('.') version = [ int(fields[0]), int(fields[1]), int(fields[2]) ] else: version = versionstr self.minversion = self.VersionNumber( version[0], version[1], version[2]) self.currentfiletime = datetime.datetime(1980, 1, 1) self.version = 'v%02d.%02d.%02d'%(version[0], version[1], version[2]) self.levelname = levelname self.basedir = basedir self.filename = None self.groupname = groupname #------------------------------------------------------------------------------ # VersionNumber #------------------------------------------------------------------------------
[docs] def VersionNumber(self, major,minor, detail): return 1000*int(major) + 100*int(minor) + int(detail)
# ------------------------------------------------------------------------------ # checkfiletime # ------------------------------------------------------------------------------
[docs] def checkfiletime(self, usert: np.datetime64): """ Used during write processes to see if the current file is exhausted :param t: :return: """ t = np.datetime64( usert, 'us').astype(datetime.datetime) thisfiletime = t.replace( second=0, microsecond= 0) newfile = thisfiletime != self.currentfiletime if newfile: # Does the minute of this time self.currentfiletime = thisfiletime dayst = thisfiletime.strftime('%Y%m%d') timestr = thisfiletime.strftime('%Y%m%d-%H%M') groupdir = os.path.normpath(self.basedir + os.sep + str(self.groupname) +os.sep + dayst ) self.filename = groupdir + os.sep + self.levelname +'_' + timestr + '_'+ self.version+'.nc' os.makedirs(groupdir, exist_ok=True) return newfile
#------------------------------------------------------------------------------ # load_list_of_files #------------------------------------------------------------------------------
[docs] def load_list_of_files(self, tmin :datetime.datetime = None, tmax:datetime.datetime = None): if tmin is None: tmin = datetime.datetime( 2000, 1, 1) if tmax is None: tmax = datetime.datetime( 2099, 1, 1) minday = datetime.datetime( tmin.year, tmin.month, tmin.day) maxday = datetime.datetime( tmax.year, tmax.month, tmax.day) + datetime.timedelta(days=1) tminminute = datetime.datetime( tmin.year, tmin.month, tmin.day, hour = tmin.hour, minute=tmin.minute) tmaxminute = datetime.datetime( tmax.year, tmax.month, tmax.day, hour = tmax.hour, minute=tmax.minute) + datetime.timedelta(minutes=1) groupdir = self.basedir + os.sep + self.groupname dirnames = os.listdir(groupdir ) goodfiles= [] for dirname in dirnames: m = re.match('(\d\d\d\d)(\d\d)(\d\d)',dirname) if (m is not None): year = int( m.group(1) ) month = int( m.group(2) ) day = int( m.group(3) ) t = datetime.datetime ( year, month, day ) checkdir = (t >= minday) and ( t < maxday) if (checkdir): thisdir = groupdir + os.sep + dirname + os.sep filenames = os.listdir(thisdir) for filename in filenames: m = re.match(r'%s_(\d\d\d\d)(\d\d)(\d\d)-(\d\d)(\d\d)_v([0-9]+)\.([0-9]+)\.([0-9]+)\.nc'%(self.levelname), filename) if (m is not None): year = int(m.group(1)) month = int(m.group(2)) day = int(m.group(3)) hour = int(m.group(4)) mins = int(m.group(5)) thisversion = self.VersionNumber( m.group(6), m.group(7), m.group(8) ) t = datetime.datetime (year, month, day, hour = hour, minute=mins ) good = (t >= tminminute) and ( t < tmaxminute) and (thisversion >= self.minversion) if (good): fullname = thisdir + os.sep + filename goodfiles.append(fullname) return goodfiles
#------------------------------------------------------------------------------ # convert_time_entry #------------------------------------------------------------------------------
[docs] def convert_time_entry(self, usert) -> datetime.datetime: if (usert is None) or (type(usert) == datetime.datetime): t = usert elif type(usert) == str: t = dateutil.parser.parse(usert) elif type(usert) == np.datetime64: t = dateutil.parser.parse(np.datetime_as_string(usert)) else: raise RuntimeError('unsupported time type') return t
#------------------------------------------------------------------------------ # load #------------------------------------------------------------------------------
[docs] def load(self, tmin :datetime.datetime = None, tmax:datetime.datetime = None): xarrayvar = None ttmin = self.convert_time_entry(tmin) ttmax = self.convert_time_entry(tmax) files = self.load_list_of_files( tmin=ttmin, tmax=ttmax) for name in files: dataset = xarray.open_dataset(name, decode_cf=True, group=self.groupname) if (xarrayvar is None): xarrayvar = dataset else: xarrayvar = xarray.concat( [xarrayvar,dataset], dim="time") return xarrayvar