from typing import NamedTuple, Sequence
from math import radians, cos, sin
import numpy as np
import xarray as xr
import sasktran as sk
from scipy.interpolate import interp1d
from osirisl1services.spatialrotation import Vector, UnitQuaternion, TaitBryanZYX, ECI
from osirisl1services.readlevel1 import open_level1_attitude, open_level1_spectrograph
from astropy.time import Time
from astropy.coordinates import EarthLocation, AltAz, get_sun
import warnings
import logging
[docs]
class PixelParams(NamedTuple):
dy: float
dz: float
y0: float
z0: float
# Spectrograph alignment data from OsirisConfigurationDatabase/flightduration/osoptics.ini
# The rotation is expressed as a euler angles "zyx" in degrees.
# 2005-01-03 NDL Initialize with the Jupiter alignment measurements.
SPECTROGRAPH_ROTATION_TB_ZYX = TaitBryanZYX(roll=radians(0.0337), pitch=radians(-0.0903), yaw=radians(-0.15810304))
SPECTROGRAPH_ROTATION_CF = SPECTROGRAPH_ROTATION_TB_ZYX.to_quaternion()
# InfraRed alignment data from OsirisConfigurationDatabase/flightduration/iroptics.ini
# The rotation is expressed as a euler angles "zyx" in degrees.
IR_ROTATION_TB_ZYX = TaitBryanZYX(roll=radians(0.0), pitch=radians(-0.06167), yaw=radians(0.0))
IR_ROTATION_CF = IR_ROTATION_TB_ZYX.to_quaternion()
MINUTES_TO_RADIANS = radians(1.0 / 60.0)
# IR1 Optics
IR1_PIXEL_PARAMS = PixelParams(dy=-1.1931386 * MINUTES_TO_RADIANS, dz=0.0, y0=105.104, z0=0.0)
# IR2 Optics
IR2_PIXEL_PARAMS = PixelParams(dy=-1.2046809 * MINUTES_TO_RADIANS, dz=0.0, y0=109.289, z0=0.0)
# IR3 Optics
IR3_PIXEL_PARAMS = PixelParams(dy=-1.2059574 * MINUTES_TO_RADIANS, dz=0.0, y0=109.492, z0=0.0)
IR_REFERENCE_PIXEL = 72
def apparent_solar_time(time, lat, lon):
time = Time(time, format='mjd')
sun = get_sun(time)
location = EarthLocation(lat=lat, lon=lon)
ra = location.get_gcrs(obstime=time).ra
return (12 - (sun.ra - ra).value / 360 * 24) % 24
[docs]
def get_orbit_number(mjd):
""" Returns the Odin orbit number for an mjd. """
# Build an interpolating function to get the approximate orbit number. This needs to be kept up to date.
orbits = np.array(
[3600, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000, 19000,
20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000, 35000,
36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000, 49000, 50000, 51000,
52000, 53000, 54000, 55000, 56000, 57000, 58000, 59000, 60000, 61000, 62000, 63000, 64000, 65000, 66000, 67000,
68000, 69000, 70000, 71000, 72000, 73000, 74000, 75000, 76000, 77000, 78000, 79000, 80000, 81000, 82000, 83000,
84000, 85000, 86000, 87000, 88000, 89010, 90000, 91000, 92000, 93000, 94000, 95230, 96000, 97000, 98000,
99000, 104000])
mjds = np.array(
[52202.91088701, 52229.83605832, 52297.1142319, 52364.33975869, 52431.52058405, 52498.67576773, 52565.80671044,
52632.90550315, 52699.98005496, 52767.03778265, 52834.0803944, 52901.11160113, 52968.13103777, 53035.13542833,
53102.12971671, 53169.11444501, 53236.09232352, 53303.06443793, 53370.02781706, 53436.98273076, 53503.93168309,
53570.87492376, 53637.81378604, 53704.74850202, 53771.67979563, 53838.60831498, 53905.53304735, 53972.45514862,
54039.37574322, 54106.29342354, 54173.20787839, 54240.1198607, 54307.02943404, 54373.9378738, 54440.84708995,
54507.75486961, 54574.66049933, 54641.56405143, 54708.46621467, 54775.36890638, 54842.27162308, 54909.17261061,
54976.07199555, 55042.96983743, 55109.86707528, 55176.76511127, 55243.66153186, 55310.554609, 55377.44450698,
55444.33222512, 55511.21910573, 55578.1036188, 55644.98409074, 55711.85592717, 55778.72147194, 55845.58222911,
55912.42797305, 55979.25316935, 56046.06553291, 56112.86547911, 56179.65617877, 56246.43608282, 56313.20212288,
56379.9559873, 56446.69682532, 56513.4248885, 56580.1437009, 56646.847799, 56713.52328793, 56780.16274797,
56846.77962105, 56913.38410717, 56979.9695577, 57046.51967139, 57113.03695558, 57179.52735469, 57246.00096125,
57312.46551237, 57378.9185779, 57445.35576321, 57511.78053948, 57578.1974532, 57644.60938976, 57711.01714241,
57777.420215, 57843.81835593, 57910.87561289, 57976.60167759, 58042.98862381, 58109.37255365, 58175.75282207,
58242.13034838, 58323.76983534, 58374.87506315, 58441.24529391, 58507.61426857, 58573.98075644, 58905.8131958])
closest_orbit = interp1d(mjds, orbits, kind=3, assume_sorted=True)(mjd).item()
logging.debug(f'Trying closest orbit {closest_orbit}.')
try:
att = open_level1_attitude(closest_orbit)
except LookupError:
logging.debug(f'MJD not found in orbit {int(closest_orbit)}, checking neighbouring orbits.')
try:
if closest_orbit - np.floor(closest_orbit) < 0.5:
att = open_level1_attitude(closest_orbit - 1)
if att.StartMJD.min() < mjd < att.EndMJD.max():
logging.debug(f'MJD found in previous orbit.')
return int(closest_orbit) - 1
else:
raise LookupError(f'There is no attitude data for mjd {mjd}.')
else:
att = open_level1_attitude(closest_orbit + 1)
if att.StartMJD.min() < mjd < att.EndMJD.max():
logging.debug(f'MJD found it next orbit.')
return int(closest_orbit) + 1
else:
raise LookupError(f'There is no attitude data for mjd {mjd}.')
except LookupError:
raise LookupError(f'There is no attitude data for mjd {mjd}.')
try:
if att.StartMJD.min() < mjd < att.EndMJD.max():
return int(closest_orbit)
elif mjd <= att.StartMJD.min():
open_level1_attitude(closest_orbit - 1)
logging.debug(f'MJD found in previous orbit.')
return int(closest_orbit) - 1
elif mjd >= att.EndMJD.max():
open_level1_attitude(closest_orbit + 1)
logging.debug(f'MJD found in next orbit.')
return int(closest_orbit) + 1
except LookupError:
raise LookupError(f'There is no attitude data for mjd {mjd}.')
def _interpolate_orbit_position_or_velocity(value: xr.DataArray, mjds):
""" Private function to interpolate position or velocity onto a given set of mjds. Normalizes interpolated values
by the norm so that points have a smooth distance or speed. """
value_interpolated = value.interp(mjdorbitstate=mjds)
norm = value.reduce(np.linalg.norm, dim='xyz')
norm_interpolated = norm.interp(mjdorbitstate=mjds)
value_interpolated *= norm_interpolated / value_interpolated.reduce(np.linalg.norm, dim='xyz')
return value_interpolated.rename({'mjdorbitstate': 'mjd'})
def _get_odin_position_or_velocity_eci(mjds: Sequence, kind, attitude=None):
""" Private function to calculate Odin's position or velocity for a set of mjds.
The ECI coordinate frame is specified in the SSC document SSAK31-7_April_2005.pdf
This method produces results that are slightly different than the current c++ level1 services in that the
c++ level1 services uses a kepler orbit to interpolate between the points and this method is simpler.
A first order correction is done on the interpolated values in that it uses the distances (norm) to
correct that the true path is curved and the linear interpolation is straight. I expect the error in this
method produces inaccuracies that are much less than the precision in the actual satellite position.
The currenct issue with the kepler orbit method and the reason the position values are not closer is that
the kepler orbit method produces small step functions in the norm that are obviously not real. """
if kind not in ['position', 'velocity']:
raise (KeyError(f"{_get_odin_position_or_velocity_eci.__name__} expects a kind of 'position' or 'velocity'"))
if not attitude:
mjds = list(mjds)
first_orbit = get_orbit_number(mjds[0])
last_orbit = get_orbit_number(mjds[-1])
attitude_datasets = []
try:
for orbit in range(first_orbit, last_orbit + 1):
attitude_datasets.append(open_level1_attitude(orbit, load_pointing=True))
attitude = xr.concat(attitude_datasets, dim='mjd')
except LookupError:
raise LookupError(f'MJD sequence used in get_odin_position has a missing orbit at {orbit}.')
if kind is 'position':
return _interpolate_orbit_position_or_velocity(
attitude.swap_dims({'mjd': 'mjdorbitstate'}).drop('mjd').position, mjds)
else:
return _interpolate_orbit_position_or_velocity(
attitude.swap_dims({'mjd': 'mjdorbitstate'}).drop('mjd').velocity, mjds)
def _eci_to_ecef(mjds, eci_values):
return [ECI(*eci).to_geo(mjd) for eci, mjd in zip(eci_values, mjds)]
[docs]
def get_odin_position_eci(mjds: Sequence, attitude=None):
""" Calculate Odin's position for a set of mjds in ECI coordinates. """
return _get_odin_position_or_velocity_eci(mjds=mjds, kind='position', attitude=attitude)
[docs]
def get_odin_velocity_eci(mjds: Sequence, attitude=None):
""" Calculate Odin's velocity for a set of mjds in ECI coordinates. """
return _get_odin_position_or_velocity_eci(mjds=mjds, kind='velocity', attitude=attitude)
[docs]
def get_odin_position_ecef(mjds: Sequence, attitude=None):
""" Calculate Odin's position for a set of mjds in ECEF coordinates. """
position_eci = get_odin_position_eci(mjds=mjds, attitude=attitude)
return _eci_to_ecef(mjds=mjds, eci_values=position_eci)
[docs]
def get_odin_velocity_ecef(mjds: Sequence, attitude=None):
""" Calculate Odin's velocity for a set of mjds in ECEF coordinates. """
velocity_eci = get_odin_velocity_eci(mjds=mjds, attitude=attitude)
return _eci_to_ecef(mjds=mjds, eci_values=velocity_eci)
[docs]
@xr.register_dataset_accessor('l1')
class Level1Services(object):
def __init__(self, xarray_ds):
self._ds = xarray_ds
self._attitude = None
self._position_eci = None
self._velocity_eci = None
# Init these variables to the cached values if they exist.
self._position_ecef = xarray_ds['position_ecef'] if 'position_ecef' in xarray_ds else None
self._velocity_ecef = xarray_ds['velocity_ecef'] if 'velocity_ecef' in xarray_ds else None
self._look_eci = xarray_ds['look_eci'] if 'look_eci' in xarray_ds else None
self._look_ecef = xarray_ds['look_ecef'] if 'look_ecef' in xarray_ds else None
self._latitude = xarray_ds['latitude'] if 'latitude' in xarray_ds else None
self._longitude = xarray_ds['longitude'] if 'longitude' in xarray_ds else None
self._altitude = xarray_ds['altitude'] if 'altitude' in xarray_ds else None
self._sza = xarray_ds['sza'] if 'sza' in xarray_ds else None
self._ssa = xarray_ds['ssa'] if 'ssa' in xarray_ds else None
self._saa = xarray_ds['saa'] if 'saa' in xarray_ds else None
self._satellite_azimuth_angle = xarray_ds['satellite_azimuth_angle'] \
if 'satellite_azimuth_angle' in xarray_ds else None
self._apparent_solar_time = xarray_ds['apparent_solar_time'] if 'apparent_solar_time' in xarray_ds else None
self._exposuremjd = self._ds.mjd + 0.5 * self._ds.exposureTime / 86400.0
@property
def attitude(self):
""" Load the corresponding satellite attitude data for the orbit. """
logging.debug('Loading level1 attitude data.')
if self._position_eci is None:
if 'orbit' in self._ds:
orbit = self._ds.orbit.item()
else:
orbit = self._ds.scanno.item() // 1000
attitude1 = open_level1_attitude(orbit, load_pointing=True)
if self._ds.mjd.min() >= attitude1.mjd.min():
self._attitude = attitude1
else:
attitude1 = attitude1.drop('ScanNumber').drop(
[var for var in attitude1.data_vars if 'mjd' not in attitude1[var].coords])
attitude0 = open_level1_attitude(orbit - 1, load_pointing=True)
attitude0 = attitude0.drop('ScanNumber').drop(
[var for var in attitude0.data_vars if 'mjd' not in attitude0[var].coords])
self._attitude = xr.concat([attitude0, attitude1], dim='mjd')
# Correct bouncing attitude quaternions
self._attitude.Qachieved.values = \
self._attitude.Qachieved * \
self._attitude.Qachieved.dot(self._attitude.Qachieved.shift(mjd=1), dims='1ijk').pipe(np.sign).cumprod()
return self._attitude
@property
def position_eci(self):
""" Calculate satellite position of a spectrograph / IR Dataset in ECI coordinates at mid-exposure time. """
logging.debug('Calculating level1 ECI position.')
if self._position_eci is None:
position = get_odin_position_eci(self._exposuremjd.values, self.attitude)
self._position_eci = position.assign_coords(mjd=self._ds.mjd.values)
return self._position_eci
@property
def position_ecef(self):
""" Calculate satellite position of a spectrograph / IR Dataset in ECEF coordinates at mid-exposure time. """
logging.debug('Calculating level1 ECEF position.')
if self._position_ecef is None:
position_ecef = _eci_to_ecef(self._exposuremjd, self.position_eci)
self._position_ecef = xr.DataArray(position_ecef, coords=[('mjd', self._ds.mjd.values),
('xyz', ['x', 'y', 'z'])])
return self._position_ecef
@property
def velocity_eci(self):
""" Calculate satellite velocity for a spectrograph / IR Dataset in ECI coordinates at mid-exposure time. """
logging.debug('Calculating level1 ECI velocity.')
if self._velocity_eci is None:
velocity = get_odin_velocity_eci(self._exposuremjd.values, self.attitude)
self._velocity_eci = velocity.assign_coords(mjd=self._ds.mjd.values)
return self._velocity_eci
@property
def velocity_ecef(self):
""" Calculate satellite velocity for a spectrograph / IR Dataset in ECEF coordinates at mid-exposure time. """
logging.debug('Calculating level1 ECEF position.')
if self._velocity_ecef is None:
velocity_ecef = _eci_to_ecef(self._exposuremjd, self.velocity_eci)
self._velocity_ecef = xr.DataArray(velocity_ecef, coords=[('mjd', self._ds.mjd.values),
('xyz', ['x', 'y', 'z'])])
return self._velocity_ecef
@property
def look_eci(self):
""" Calculate spectrograph / IR look vectors in ECI coordinates. """
logging.debug('Calculating level1 ECI look vectors.')
if self._look_eci is None:
# Load the quaternions
quaternion = self.attitude.Qachieved
if 'channel' in self._ds.data_vars: # it is an ir dataset
pixel_dict = {1: IR1_PIXEL_PARAMS, 2: IR2_PIXEL_PARAMS, 3: IR3_PIXEL_PARAMS}
channel = self._ds.channel.item()
thetas = (self._ds.pixel + 0.5 - pixel_dict[channel].y0) * pixel_dict[channel].dy
pixel_cfs = [Vector(cos(theta), sin(theta), 0.0) for theta in thetas]
look = [(UnitQuaternion(*q).normalize() * IR_ROTATION_CF).rot_no_z(pixel_cfs)
for q in quaternion.interp(mjd=self._exposuremjd.values).values]
self._look_eci = xr.DataArray(np.array(look),
coords=[('mjd', self._ds.mjd.values),
('pixel', range(128)),
('xyz', ['x', 'y', 'z'])])
else: # it is a spectrograph dataset
look = [(UnitQuaternion(*q).normalize() * SPECTROGRAPH_ROTATION_CF).rot_unit_x()
for q in quaternion.interp(mjd=self._exposuremjd.values).values]
# Apply the RSAS shift to the spectrograph data if the shift was loaded in the spectrograph data.
if 'rsas_shift' in self._ds.data_vars:
if self._ds.rsas_shift.attrs['units'] == 'radians':
shifted_look = []
theta = self._ds.data_vars['rsas_shift']
for (p, l) in zip(self.position_eci.values, look):
rot_around = np.cross(l, p)
rot_around /= np.linalg.norm(rot_around)
rq = UnitQuaternion(cos(theta / 2.), *(sin(theta / 2.) * rot_around))
shifted_look.append(rq.rot([l])[0])
look = shifted_look
elif self._ds.rsas_shift.attrs['units'] == 'm':
shifted_look = []
geo = sk.Geodetic('WGS84')
for (p, l) in zip(self.position_eci.values, look):
# A better shift would also slightly change the latitude/longitude, but this is good enough
geo.from_tangent_point(p, l)
geo.from_lat_lon_alt(geo.latitude, geo.longitude,
geo.altitude - float(self._ds.data_vars['rsas_shift']))
new_look = geo.location - p
shifted_look.append(new_look / np.linalg.norm(new_look))
look = shifted_look
self._look_eci = xr.DataArray(look, coords=[('mjd', self._ds.mjd.values), ('xyz', ['x', 'y', 'z'])])
return self._look_eci
@property
def look_ecef(self):
""" Calculate spectrograph / IR look vectors in ECEF coordinates. """
logging.debug('Calculating level1 ECEF look vectors.')
if self._look_ecef is None:
if 'channel' in self._ds.data_vars: # it is an ir dataset
look_ecef = []
for look_eci, mjd in zip(self.look_eci.values, self._exposuremjd.values):
pixel_ecef = [ECI(*pixel_eci).to_geo(mjd) for pixel_eci in look_eci]
look_ecef.append(pixel_ecef)
self._look_ecef = xr.DataArray(np.array(look_ecef),
coords=[('mjd', self._ds.mjd.values),
('pixel', range(128)),
('xyz', ['x', 'y', 'z'])])
else: # it is a spectrograph dataset
look_ecef = [ECI(*eci).to_geo(mjd) for eci, mjd in zip(self.look_eci.values, self._exposuremjd.values)]
self._look_ecef = xr.DataArray(look_ecef, coords=[('mjd', self._ds.mjd.values),
('xyz', ['x', 'y', 'z'])])
return self._look_ecef
@property
def look(self):
""" Calculate spectrograph / IR look vectors.
Returns the same result as look_eci. """
msg = 'Level 1 service l1.look will be deprecated in future versions. ' \
'Use look_ecef or look_eci instead.'
warnings.warn(msg, FutureWarning, stacklevel=2)
return self.look_eci
def _tangent_point(self):
""" Calculate spectrograph / IR tangent information (lat, lon, altitude). """
logging.debug('Calculating level1 tangent points.')
latitude = []
longitude = []
altitude = []
geo = sk.Geodetic('WGS84')
if self.look_ecef.ndim == 3: # it is an ir dataset
for (mjd, pos, look) in zip(self._exposuremjd.values, self.position_ecef.values, self.look_ecef.values):
for pixellook in look:
geo.from_tangent_point(Vector(*pos), Vector(*pixellook))
# Check that the tangent altitude is above the center of the Earth.
if geo.altitude > -6000000.:
latitude.append(geo.latitude)
longitude.append(geo.longitude)
altitude.append(geo.altitude)
else: # return NaN
latitude.append(np.nan)
longitude.append(np.nan)
altitude.append(np.nan)
self._latitude = xr.DataArray(np.reshape(latitude, (-1, 128)),
coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))])
self._latitude = self._latitude.sel(pixel=IR_REFERENCE_PIXEL).drop('pixel')
self._longitude = xr.DataArray(np.reshape(longitude, (-1, 128)),
coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))])
self._longitude = self._longitude.sel(pixel=IR_REFERENCE_PIXEL).drop('pixel')
self._altitude = xr.DataArray(np.reshape(altitude, (-1, 128)),
coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))])
else: # it is a spectrograph dataset
for (mjd, pos, look) in zip(self._exposuremjd.values, self.position_ecef.values, self.look_ecef.values):
geo.from_tangent_point(Vector(*pos), Vector(*look))
latitude.append(geo.latitude)
longitude.append(geo.longitude)
altitude.append(geo.altitude)
self._latitude = xr.DataArray(latitude, coords=[('mjd', self._ds.mjd.values)])
self._longitude = xr.DataArray(longitude, coords=[('mjd', self._ds.mjd.values)])
self._altitude = xr.DataArray(altitude, coords=[('mjd', self._ds.mjd.values)])
@property
def latitude(self):
if self._latitude is None:
self._tangent_point()
return self._latitude
@property
def longitude(self):
if self._longitude is None:
self._tangent_point()
return self._longitude
@property
def altitude(self):
if self._altitude is None:
self._tangent_point()
return self._altitude
@property
def sza(self):
""" Calculate spectrograph / IR solar zenith angle. """
logging.debug('Calculating level1 solar zenith angles.')
if self._sza is None:
sza = []
if self.look_ecef.ndim == 3: # it is an ir dataset
altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL)
else: # it is a spectrograph dataset
altitude = self.altitude
for (mjd, lat, lon, height) in zip(self._exposuremjd.values, self.latitude.values,
self.longitude.values, altitude.values):
loc = EarthLocation(lat=lat, lon=lon, height=height)
time = Time(mjd, format='mjd')
sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc))
sza.append(90.0 - sun.alt.value)
self._sza = xr.DataArray(sza, coords=[('mjd', self._ds.mjd.values)])
return self._sza
@property
def ssa(self):
""" Calculate spectrograph / IR solar scattering angle. """
logging.debug('Calculating level1 solar scattering angles.')
if self._ssa is None:
ssa = []
if self.look_ecef.ndim == 3: # it is an ir dataset
altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL)
else: # it is a spectrograph dataset
altitude = self.altitude
for (mjd, pos, lat, lon, height) in zip(
self._exposuremjd.values, self.position_ecef.values,
self.latitude.values, self.longitude.values, altitude.values):
loc = EarthLocation(lat=lat, lon=lon, height=height)
time = Time(mjd, format='mjd')
sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc))
sat = EarthLocation(*pos, unit='m')
sat = sat.get_itrs(obstime=time).transform_to(AltAz(obstime=time, location=loc))
ssa.append(180.0 - sun.separation(sat).value)
self._ssa = xr.DataArray(ssa, coords=[('mjd', self._ds.mjd.values)])
return self._ssa
@property
def saa(self):
""" Calculate the solar azimuth angle at the OSIRIS tangent point. """
logging.debug('Calculating level1 solar azimuth angle.')
if self._saa is None:
saa = []
if self.look_ecef.ndim == 3: # it is an ir dataset
altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL)
else: # it is a spectrograph dataset
altitude = self.altitude
for (mjd, pos, lat, lon, height) in zip(
self._exposuremjd.values, self.position_ecef,
self.latitude.values, self.longitude.values, altitude.values):
loc = EarthLocation(lat=lat, lon=lon, height=height)
time = Time(mjd, format='mjd')
sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc))
saa.append(sun.az.value)
self._saa = xr.DataArray(saa, coords=[('mjd', self._ds.mjd.values)])
return self._saa
@property
def satellite_azimuth_angle(self):
""" Calculate spectrograph / IR look azimuth angle. """
logging.debug('Calculating level1 satellite look direction azimuth angle.')
if self._satellite_azimuth_angle is None:
sat_aa = []
if self.look_ecef.ndim == 3: # it is an ir dataset
altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL)
else: # it is a spectrograph dataset
altitude = self.altitude
for (mjd, pos, lat, lon, height) in zip(
self._exposuremjd.values, self.position_ecef.values,
self.latitude.values, self.longitude.values, altitude.values):
loc = EarthLocation(lat=lat, lon=lon, height=height)
time = Time(mjd, format='mjd')
sat = EarthLocation(*pos, unit='m')
sat = sat.get_itrs(obstime=time).transform_to(AltAz(obstime=time, location=loc))
sat_aa.append((sat.az.value - 180) % 360.0)
self._satellite_azimuth_angle = xr.DataArray(sat_aa, coords=[('mjd', self._ds.mjd.values)])
return self._satellite_azimuth_angle
@property
def apparent_solar_time(self):
if self._apparent_solar_time is None:
if self.look_ecef.ndim == 3: # it is an ir dataset
latitude = self.latitude.sel(pixel=72)
longitude = self.longitude.sel(pixel=72)
else: # it is a spectrograph dataset
latitude = self.latitude
longitude = self.longitude
self._apparent_solar_time = xr.apply_ufunc(
apparent_solar_time, self._exposuremjd, latitude.values, longitude.values)
return self._apparent_solar_time
def monotonic_altitude_slice(self):
if 'channel' in self._ds.data_vars: # it is an ir dataset
raise (NotImplementedError('Monotonic altitude slice is not implemented for IR data.'))
diffs = self.altitude.diff(dim='mjd')
if self.altitude.values[-1] > self.altitude.values[0]:
monotonic = diffs > 0
else:
monotonic = diffs < 0
if monotonic.all():
return slice(None, None)
else:
first, last = None, None
if not monotonic[0]:
for (i, b) in enumerate(monotonic):
if b:
first = i
break
if not monotonic[-1]:
for (i, b) in enumerate(reversed(monotonic)):
if b:
last = -i
break
if not monotonic.isel(mjd=slice(first, last)).all():
raise (ValueError('Scan altitudes cannot be made to monotonically increasing/decreasing'))
logging.debug(f'Trimming from {slice(first, last)}')
return slice(first, last)
[docs]
def get_scannos_in_orbit_below_sza(orbit: int, sza: float, target_altitude: float = 25000):
""" For a given orbit return an iterator of the scans whose tangent point is above a configurable horizon.
Useful for retrieval code.
:param Orbit orbit: The orbit class handling the data retrieval, specified by orbit number
:param float sza: Reference solar zenith angle
:param target_altitude: Altitude to take solar zenith angle reference from, default is 25000m
"""
try:
attitude = open_level1_attitude(orbit=orbit, load_pointing=True)
except LookupError: # No valid orbits
return
for scanno in attitude.ScanNumber.values:
logging.debug(f'Checking scan {scanno} to see if it is below a sza of {sza}')
try:
spectrograph = open_level1_spectrograph(scanno=scanno)
except LookupError: # No data for scan
logging.debug(f'Scan {scanno} has no spectrograph data.')
continue
spectrograph['altitude'] = spectrograph.l1.altitude
spectrograph['sza'] = spectrograph.l1.sza
spectrograph = spectrograph.dropna(dim='mjd', subset=['altitude']).sortby('altitude')
sza_at_target = spectrograph.swap_dims({'mjd': 'altitude'}).sza.sel(altitude=target_altitude, method='nearest')
if sza_at_target < sza:
logging.info(f'SZA for scan {scanno} is below {sza}, yielding scan.')
yield scanno
else:
logging.info(f'SZA for scan {scanno} is not below {sza}, skipping scan.')