import numpy as np
import datetime
import dateutil.parser
import xarray
import os
import os.path
import re
import numpy as np
from typing import Tuple, Union
#------------------------------------------------------------------------------
# class L1_Netcdf_FileOrganizer
#------------------------------------------------------------------------------
[docs]class L1_Netcdf_FileOrganizer():
def __init__(self, basedir : str, groupname:str, levelname : str, versionstr : Union[str, Tuple[int,int,int]]):
"""
:param basedir: Basedirectory to store
:param groupname: Groupname with
:param levelname: l1a, l1b or similar
:param versionstr:
"""
if type(versionstr) == str:
fields = versionstr.split('.')
version = [ int(fields[0]), int(fields[1]), int(fields[2]) ]
else:
version = versionstr
self.minversion = self.VersionNumber( version[0], version[1], version[2])
self.currentfiletime = datetime.datetime(1980, 1, 1)
self.version = 'v%02d.%02d.%02d'%(version[0], version[1], version[2])
self.levelname = levelname
self.basedir = basedir
self.filename = None
self.groupname = groupname
#------------------------------------------------------------------------------
# VersionNumber
#------------------------------------------------------------------------------
[docs] def VersionNumber(self, major,minor, detail):
return 1000*int(major) + 100*int(minor) + int(detail)
# ------------------------------------------------------------------------------
# checkfiletime
# ------------------------------------------------------------------------------
[docs] def checkfiletime(self, usert: np.datetime64):
"""
Used during write processes to see if the current file is exhausted
:param t:
:return:
"""
t = np.datetime64( usert, 'us').astype(datetime.datetime)
thisfiletime = t.replace( second=0, microsecond= 0)
newfile = thisfiletime != self.currentfiletime
if newfile: # Does the minute of this time
self.currentfiletime = thisfiletime
dayst = thisfiletime.strftime('%Y%m%d')
timestr = thisfiletime.strftime('%Y%m%d-%H%M')
groupdir = os.path.normpath(self.basedir + os.sep + str(self.groupname) +os.sep + dayst )
self.filename = groupdir + os.sep + self.levelname +'_' + timestr + '_'+ self.version+'.nc'
os.makedirs(groupdir, exist_ok=True)
return newfile
#------------------------------------------------------------------------------
# load_list_of_files
#------------------------------------------------------------------------------
[docs] def load_list_of_files(self, tmin :datetime.datetime = None, tmax:datetime.datetime = None):
if tmin is None: tmin = datetime.datetime( 2000, 1, 1)
if tmax is None: tmax = datetime.datetime( 2099, 1, 1)
minday = datetime.datetime( tmin.year, tmin.month, tmin.day)
maxday = datetime.datetime( tmax.year, tmax.month, tmax.day) + datetime.timedelta(days=1)
tminminute = datetime.datetime( tmin.year, tmin.month, tmin.day, hour = tmin.hour, minute=tmin.minute)
tmaxminute = datetime.datetime( tmax.year, tmax.month, tmax.day, hour = tmax.hour, minute=tmax.minute) + datetime.timedelta(minutes=1)
groupdir = self.basedir + os.sep + self.groupname
dirnames = os.listdir(groupdir )
goodfiles= []
for dirname in dirnames:
m = re.match('(\d\d\d\d)(\d\d)(\d\d)',dirname)
if (m is not None):
year = int( m.group(1) )
month = int( m.group(2) )
day = int( m.group(3) )
t = datetime.datetime ( year, month, day )
checkdir = (t >= minday) and ( t < maxday)
if (checkdir):
thisdir = groupdir + os.sep + dirname + os.sep
filenames = os.listdir(thisdir)
for filename in filenames:
m = re.match(r'%s_(\d\d\d\d)(\d\d)(\d\d)-(\d\d)(\d\d)_v([0-9]+)\.([0-9]+)\.([0-9]+)\.nc'%(self.levelname), filename)
if (m is not None):
year = int(m.group(1))
month = int(m.group(2))
day = int(m.group(3))
hour = int(m.group(4))
mins = int(m.group(5))
thisversion = self.VersionNumber( m.group(6), m.group(7), m.group(8) )
t = datetime.datetime (year, month, day, hour = hour, minute=mins )
good = (t >= tminminute) and ( t < tmaxminute) and (thisversion >= self.minversion)
if (good):
fullname = thisdir + os.sep + filename
goodfiles.append(fullname)
return goodfiles
#------------------------------------------------------------------------------
# convert_time_entry
#------------------------------------------------------------------------------
[docs] def convert_time_entry(self, usert) -> datetime.datetime:
if (usert is None) or (type(usert) == datetime.datetime):
t = usert
elif type(usert) == str:
t = dateutil.parser.parse(usert)
elif type(usert) == np.datetime64:
t = dateutil.parser.parse(np.datetime_as_string(usert))
else:
raise RuntimeError('unsupported time type')
return t
#------------------------------------------------------------------------------
# load
#------------------------------------------------------------------------------
[docs] def load(self, tmin :datetime.datetime = None, tmax:datetime.datetime = None):
xarrayvar = None
ttmin = self.convert_time_entry(tmin)
ttmax = self.convert_time_entry(tmax)
files = self.load_list_of_files( tmin=ttmin, tmax=ttmax)
for name in files:
dataset = xarray.open_dataset(name, decode_cf=True, group=self.groupname)
if (xarrayvar is None):
xarrayvar = dataset
else:
xarrayvar = xarray.concat( [xarrayvar,dataset], dim="time")
return xarrayvar