Source code for services

from typing import NamedTuple, Sequence
from math import radians, cos, sin
import numpy as np
import xarray as xr
import sasktran as sk
from scipy.interpolate import interp1d
from osirisl1services.spatialrotation import Vector, UnitQuaternion, TaitBryanZYX, ECI
from osirisl1services.readlevel1 import open_level1_attitude, open_level1_spectrograph
from astropy.time import Time
from astropy.coordinates import EarthLocation, AltAz, get_sun
import warnings
import logging


[docs]class PixelParams(NamedTuple): dy: float dz: float y0: float z0: float
# Spectrograph alignment data from OsirisConfigurationDatabase/flightduration/osoptics.ini # The rotation is expressed as a euler angles "zyx" in degrees. # 2005-01-03 NDL Initialize with the Jupiter alignment measurements. SPECTROGRAPH_ROTATION_TB_ZYX = TaitBryanZYX(roll=radians(0.0337), pitch=radians(-0.0903), yaw=radians(-0.15810304)) SPECTROGRAPH_ROTATION_CF = SPECTROGRAPH_ROTATION_TB_ZYX.to_quaternion() # InfraRed alignment data from OsirisConfigurationDatabase/flightduration/iroptics.ini # The rotation is expressed as a euler angles "zyx" in degrees. IR_ROTATION_TB_ZYX = TaitBryanZYX(roll=radians(0.0), pitch=radians(-0.06167), yaw=radians(0.0)) IR_ROTATION_CF = IR_ROTATION_TB_ZYX.to_quaternion() MINUTES_TO_RADIANS = radians(1.0 / 60.0) # IR1 Optics IR1_PIXEL_PARAMS = PixelParams(dy=-1.1931386 * MINUTES_TO_RADIANS, dz=0.0, y0=105.104, z0=0.0) # IR2 Optics IR2_PIXEL_PARAMS = PixelParams(dy=-1.2046809 * MINUTES_TO_RADIANS, dz=0.0, y0=109.289, z0=0.0) # IR3 Optics IR3_PIXEL_PARAMS = PixelParams(dy=-1.2059574 * MINUTES_TO_RADIANS, dz=0.0, y0=109.492, z0=0.0) IR_REFERENCE_PIXEL = 72 def apparent_solar_time(time, lat, lon): time = Time(time, format='mjd') sun = get_sun(time) location = EarthLocation(lat=lat, lon=lon) ra = location.get_gcrs(obstime=time).ra return (12 - (sun.ra - ra).value / 360 * 24) % 24
[docs]def get_orbit_number(mjd): """ Returns the Odin orbit number for an mjd. """ # Build an interpolating function to get the approximate orbit number. This needs to be kept up to date. orbits = np.array( [3600, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000, 49000, 50000, 51000, 52000, 53000, 54000, 55000, 56000, 57000, 58000, 59000, 60000, 61000, 62000, 63000, 64000, 65000, 66000, 67000, 68000, 69000, 70000, 71000, 72000, 73000, 74000, 75000, 76000, 77000, 78000, 79000, 80000, 81000, 82000, 83000, 84000, 85000, 86000, 87000, 88000, 89010, 90000, 91000, 92000, 93000, 94000, 95230, 96000, 97000, 98000, 99000, 104000]) mjds = np.array( [52202.91088701, 52229.83605832, 52297.1142319, 52364.33975869, 52431.52058405, 52498.67576773, 52565.80671044, 52632.90550315, 52699.98005496, 52767.03778265, 52834.0803944, 52901.11160113, 52968.13103777, 53035.13542833, 53102.12971671, 53169.11444501, 53236.09232352, 53303.06443793, 53370.02781706, 53436.98273076, 53503.93168309, 53570.87492376, 53637.81378604, 53704.74850202, 53771.67979563, 53838.60831498, 53905.53304735, 53972.45514862, 54039.37574322, 54106.29342354, 54173.20787839, 54240.1198607, 54307.02943404, 54373.9378738, 54440.84708995, 54507.75486961, 54574.66049933, 54641.56405143, 54708.46621467, 54775.36890638, 54842.27162308, 54909.17261061, 54976.07199555, 55042.96983743, 55109.86707528, 55176.76511127, 55243.66153186, 55310.554609, 55377.44450698, 55444.33222512, 55511.21910573, 55578.1036188, 55644.98409074, 55711.85592717, 55778.72147194, 55845.58222911, 55912.42797305, 55979.25316935, 56046.06553291, 56112.86547911, 56179.65617877, 56246.43608282, 56313.20212288, 56379.9559873, 56446.69682532, 56513.4248885, 56580.1437009, 56646.847799, 56713.52328793, 56780.16274797, 56846.77962105, 56913.38410717, 56979.9695577, 57046.51967139, 57113.03695558, 57179.52735469, 57246.00096125, 57312.46551237, 57378.9185779, 57445.35576321, 57511.78053948, 57578.1974532, 57644.60938976, 57711.01714241, 57777.420215, 57843.81835593, 57910.87561289, 57976.60167759, 58042.98862381, 58109.37255365, 58175.75282207, 58242.13034838, 58323.76983534, 58374.87506315, 58441.24529391, 58507.61426857, 58573.98075644, 58905.8131958]) closest_orbit = interp1d(mjds, orbits, kind=3, assume_sorted=True)(mjd).item() logging.debug(f'Trying closest orbit {closest_orbit}.') try: att = open_level1_attitude(closest_orbit) except LookupError: logging.debug(f'MJD not found in orbit {int(closest_orbit)}, checking neighbouring orbits.') try: if closest_orbit - np.floor(closest_orbit) < 0.5: att = open_level1_attitude(closest_orbit - 1) if att.StartMJD.min() < mjd < att.EndMJD.max(): logging.debug(f'MJD found in previous orbit.') return int(closest_orbit) - 1 else: raise LookupError(f'There is no attitude data for mjd {mjd}.') else: att = open_level1_attitude(closest_orbit + 1) if att.StartMJD.min() < mjd < att.EndMJD.max(): logging.debug(f'MJD found it next orbit.') return int(closest_orbit) + 1 else: raise LookupError(f'There is no attitude data for mjd {mjd}.') except LookupError: raise LookupError(f'There is no attitude data for mjd {mjd}.') try: if att.StartMJD.min() < mjd < att.EndMJD.max(): return int(closest_orbit) elif mjd <= att.StartMJD.min(): open_level1_attitude(closest_orbit - 1) logging.debug(f'MJD found in previous orbit.') return int(closest_orbit) - 1 elif mjd >= att.EndMJD.max(): open_level1_attitude(closest_orbit + 1) logging.debug(f'MJD found in next orbit.') return int(closest_orbit) + 1 except LookupError: raise LookupError(f'There is no attitude data for mjd {mjd}.')
def _interpolate_orbit_position_or_velocity(value: xr.DataArray, mjds): """ Private function to interpolate position or velocity onto a given set of mjds. Normalizes interpolated values by the norm so that points have a smooth distance or speed. """ value_interpolated = value.interp(mjdorbitstate=mjds) norm = value.reduce(np.linalg.norm, dim='xyz') norm_interpolated = norm.interp(mjdorbitstate=mjds) value_interpolated *= norm_interpolated / value_interpolated.reduce(np.linalg.norm, dim='xyz') return value_interpolated.rename({'mjdorbitstate': 'mjd'}) def _get_odin_position_or_velocity_eci(mjds: Sequence, kind, attitude=None): """ Private function to calculate Odin's position or velocity for a set of mjds. The ECI coordinate frame is specified in the SSC document SSAK31-7_April_2005.pdf This method produces results that are slightly different than the current c++ level1 services in that the c++ level1 services uses a kepler orbit to interpolate between the points and this method is simpler. A first order correction is done on the interpolated values in that it uses the distances (norm) to correct that the true path is curved and the linear interpolation is straight. I expect the error in this method produces inaccuracies that are much less than the precision in the actual satellite position. The currenct issue with the kepler orbit method and the reason the position values are not closer is that the kepler orbit method produces small step functions in the norm that are obviously not real. """ if kind not in ['position', 'velocity']: raise (KeyError(f"{_get_odin_position_or_velocity_eci.__name__} expects a kind of 'position' or 'velocity'")) if not attitude: mjds = list(mjds) first_orbit = get_orbit_number(mjds[0]) last_orbit = get_orbit_number(mjds[-1]) attitude_datasets = [] try: for orbit in range(first_orbit, last_orbit + 1): attitude_datasets.append(open_level1_attitude(orbit, load_pointing=True)) attitude = xr.concat(attitude_datasets, dim='mjd') except LookupError: raise LookupError(f'MJD sequence used in get_odin_position has a missing orbit at {orbit}.') if kind is 'position': return _interpolate_orbit_position_or_velocity( attitude.swap_dims({'mjd': 'mjdorbitstate'}).drop('mjd').position, mjds) else: return _interpolate_orbit_position_or_velocity( attitude.swap_dims({'mjd': 'mjdorbitstate'}).drop('mjd').velocity, mjds) def _eci_to_ecef(mjds, eci_values): return [ECI(*eci).to_geo(mjd) for eci, mjd in zip(eci_values, mjds)]
[docs]def get_odin_position_eci(mjds: Sequence, attitude=None): """ Calculate Odin's position for a set of mjds in ECI coordinates. """ return _get_odin_position_or_velocity_eci(mjds=mjds, kind='position', attitude=attitude)
[docs]def get_odin_velocity_eci(mjds: Sequence, attitude=None): """ Calculate Odin's velocity for a set of mjds in ECI coordinates. """ return _get_odin_position_or_velocity_eci(mjds=mjds, kind='velocity', attitude=attitude)
[docs]def get_odin_position_ecef(mjds: Sequence, attitude=None): """ Calculate Odin's position for a set of mjds in ECEF coordinates. """ position_eci = get_odin_position_eci(mjds=mjds, attitude=attitude) return _eci_to_ecef(mjds=mjds, eci_values=position_eci)
[docs]def get_odin_velocity_ecef(mjds: Sequence, attitude=None): """ Calculate Odin's velocity for a set of mjds in ECEF coordinates. """ velocity_eci = get_odin_velocity_eci(mjds=mjds, attitude=attitude) return _eci_to_ecef(mjds=mjds, eci_values=velocity_eci)
[docs]@xr.register_dataset_accessor('l1') class Level1Services(object): def __init__(self, xarray_ds): self._ds = xarray_ds self._attitude = None self._position_eci = None self._velocity_eci = None # Init these variables to the cached values if they exist. self._position_ecef = xarray_ds['position_ecef'] if 'position_ecef' in xarray_ds else None self._velocity_ecef = xarray_ds['velocity_ecef'] if 'velocity_ecef' in xarray_ds else None self._look_eci = xarray_ds['look_eci'] if 'look_eci' in xarray_ds else None self._look_ecef = xarray_ds['look_ecef'] if 'look_ecef' in xarray_ds else None self._latitude = xarray_ds['latitude'] if 'latitude' in xarray_ds else None self._longitude = xarray_ds['longitude'] if 'longitude' in xarray_ds else None self._altitude = xarray_ds['altitude'] if 'altitude' in xarray_ds else None self._sza = xarray_ds['sza'] if 'sza' in xarray_ds else None self._ssa = xarray_ds['ssa'] if 'ssa' in xarray_ds else None self._saa = xarray_ds['saa'] if 'saa' in xarray_ds else None self._satellite_azimuth_angle = xarray_ds['satellite_azimuth_angle'] \ if 'satellite_azimuth_angle' in xarray_ds else None self._apparent_solar_time = xarray_ds['apparent_solar_time'] if 'apparent_solar_time' in xarray_ds else None self._exposuremjd = self._ds.mjd + 0.5 * self._ds.exposureTime / 86400.0 @property def attitude(self): """ Load the corresponding satellite attitude data for the orbit. """ logging.debug('Loading level1 attitude data.') if self._position_eci is None: if 'orbit' in self._ds: orbit = self._ds.orbit.item() else: orbit = self._ds.scanno.item() // 1000 attitude1 = open_level1_attitude(orbit, load_pointing=True) if self._ds.mjd.min() >= attitude1.mjd.min(): self._attitude = attitude1 else: attitude1 = attitude1.drop('ScanNumber').drop( [var for var in attitude1.data_vars if 'mjd' not in attitude1[var].coords]) attitude0 = open_level1_attitude(orbit - 1, load_pointing=True) attitude0 = attitude0.drop('ScanNumber').drop( [var for var in attitude0.data_vars if 'mjd' not in attitude0[var].coords]) self._attitude = xr.concat([attitude0, attitude1], dim='mjd') # Correct bouncing attitude quaternions self._attitude.Qachieved.values = \ self._attitude.Qachieved * \ self._attitude.Qachieved.dot(self._attitude.Qachieved.shift(mjd=1), dims='1ijk').pipe(np.sign).cumprod() return self._attitude @property def position_eci(self): """ Calculate satellite position of a spectrograph / IR Dataset in ECI coordinates at mid-exposure time. """ logging.debug('Calculating level1 ECI position.') if self._position_eci is None: position = get_odin_position_eci(self._exposuremjd.values, self.attitude) self._position_eci = position.assign_coords(mjd=self._ds.mjd.values) return self._position_eci @property def position_ecef(self): """ Calculate satellite position of a spectrograph / IR Dataset in ECEF coordinates at mid-exposure time. """ logging.debug('Calculating level1 ECEF position.') if self._position_ecef is None: position_ecef = _eci_to_ecef(self._exposuremjd, self.position_eci) self._position_ecef = xr.DataArray(position_ecef, coords=[('mjd', self._ds.mjd.values), ('xyz', ['x', 'y', 'z'])]) return self._position_ecef @property def velocity_eci(self): """ Calculate satellite velocity for a spectrograph / IR Dataset in ECI coordinates at mid-exposure time. """ logging.debug('Calculating level1 ECI velocity.') if self._velocity_eci is None: velocity = get_odin_velocity_eci(self._exposuremjd.values, self.attitude) self._velocity_eci = velocity.assign_coords(mjd=self._ds.mjd.values) return self._velocity_eci @property def velocity_ecef(self): """ Calculate satellite velocity for a spectrograph / IR Dataset in ECEF coordinates at mid-exposure time. """ logging.debug('Calculating level1 ECEF position.') if self._velocity_ecef is None: velocity_ecef = _eci_to_ecef(self._exposuremjd, self.velocity_eci) self._velocity_ecef = xr.DataArray(velocity_ecef, coords=[('mjd', self._ds.mjd.values), ('xyz', ['x', 'y', 'z'])]) return self._velocity_ecef @property def look_eci(self): """ Calculate spectrograph / IR look vectors in ECI coordinates. """ logging.debug('Calculating level1 ECI look vectors.') if self._look_eci is None: # Load the quaternions quaternion = self.attitude.Qachieved if 'channel' in self._ds.data_vars: # it is an ir dataset pixel_dict = {1: IR1_PIXEL_PARAMS, 2: IR2_PIXEL_PARAMS, 3: IR3_PIXEL_PARAMS} channel = self._ds.channel.item() thetas = (self._ds.pixel + 0.5 - pixel_dict[channel].y0) * pixel_dict[channel].dy pixel_cfs = [Vector(cos(theta), sin(theta), 0.0) for theta in thetas] look = [(UnitQuaternion(*q).normalize() * IR_ROTATION_CF).rot_no_z(pixel_cfs) for q in quaternion.interp(mjd=self._exposuremjd.values).values] self._look_eci = xr.DataArray(np.array(look), coords=[('mjd', self._ds.mjd.values), ('pixel', range(128)), ('xyz', ['x', 'y', 'z'])]) else: # it is a spectrograph dataset look = [(UnitQuaternion(*q).normalize() * SPECTROGRAPH_ROTATION_CF).rot_unit_x() for q in quaternion.interp(mjd=self._exposuremjd.values).values] # Apply the RSAS shift to the spectrograph data if the shift was loaded in the spectrograph data. if 'rsas_shift' in self._ds.data_vars: if self._ds.rsas_shift.attrs['units'] == 'radians': shifted_look = [] theta = self._ds.data_vars['rsas_shift'] for (p, l) in zip(self.position_eci.values, look): rot_around = np.cross(l, p) rot_around /= np.linalg.norm(rot_around) rq = UnitQuaternion(cos(theta / 2.), *(sin(theta / 2.) * rot_around)) shifted_look.append(rq.rot([l])[0]) look = shifted_look elif self._ds.rsas_shift.attrs['units'] == 'm': shifted_look = [] geo = sk.Geodetic('WGS84') for (p, l) in zip(self.position_eci.values, look): # A better shift would also slightly change the latitude/longitude, but this is good enough geo.from_tangent_point(p, l) geo.from_lat_lon_alt(geo.latitude, geo.longitude, geo.altitude - float(self._ds.data_vars['rsas_shift'])) new_look = geo.location - p shifted_look.append(new_look / np.linalg.norm(new_look)) look = shifted_look self._look_eci = xr.DataArray(look, coords=[('mjd', self._ds.mjd.values), ('xyz', ['x', 'y', 'z'])]) return self._look_eci @property def look_ecef(self): """ Calculate spectrograph / IR look vectors in ECEF coordinates. """ logging.debug('Calculating level1 ECEF look vectors.') if self._look_ecef is None: if 'channel' in self._ds.data_vars: # it is an ir dataset look_ecef = [] for look_eci, mjd in zip(self.look_eci.values, self._exposuremjd.values): pixel_ecef = [ECI(*pixel_eci).to_geo(mjd) for pixel_eci in look_eci] look_ecef.append(pixel_ecef) self._look_ecef = xr.DataArray(np.array(look_ecef), coords=[('mjd', self._ds.mjd.values), ('pixel', range(128)), ('xyz', ['x', 'y', 'z'])]) else: # it is a spectrograph dataset look_ecef = [ECI(*eci).to_geo(mjd) for eci, mjd in zip(self.look_eci.values, self._exposuremjd.values)] self._look_ecef = xr.DataArray(look_ecef, coords=[('mjd', self._ds.mjd.values), ('xyz', ['x', 'y', 'z'])]) return self._look_ecef @property def look(self): """ Calculate spectrograph / IR look vectors. Returns the same result as look_eci. """ msg = 'Level 1 service l1.look will be deprecated in future versions. ' \ 'Use look_ecef or look_eci instead.' warnings.warn(msg, FutureWarning, stacklevel=2) return self.look_eci def _tangent_point(self): """ Calculate spectrograph / IR tangent information (lat, lon, altitude). """ logging.debug('Calculating level1 tangent points.') latitude = [] longitude = [] altitude = [] geo = sk.Geodetic('WGS84') if self.look_ecef.ndim == 3: # it is an ir dataset for (mjd, pos, look) in zip(self._exposuremjd.values, self.position_ecef.values, self.look_ecef.values): for pixellook in look: geo.from_tangent_point(Vector(*pos), Vector(*pixellook)) # Check that the tangent altitude is above the center of the Earth. if geo.altitude > -6000000.: latitude.append(geo.latitude) longitude.append(geo.longitude) altitude.append(geo.altitude) else: # return NaN latitude.append(np.nan) longitude.append(np.nan) altitude.append(np.nan) self._latitude = xr.DataArray(np.reshape(latitude, (-1, 128)), coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))]) self._latitude = self._latitude.sel(pixel=IR_REFERENCE_PIXEL).drop('pixel') self._longitude = xr.DataArray(np.reshape(longitude, (-1, 128)), coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))]) self._longitude = self._longitude.sel(pixel=IR_REFERENCE_PIXEL).drop('pixel') self._altitude = xr.DataArray(np.reshape(altitude, (-1, 128)), coords=[('mjd', self._ds.mjd.values), ('pixel', range(128))]) else: # it is a spectrograph dataset for (mjd, pos, look) in zip(self._exposuremjd.values, self.position_ecef.values, self.look_ecef.values): geo.from_tangent_point(Vector(*pos), Vector(*look)) latitude.append(geo.latitude) longitude.append(geo.longitude) altitude.append(geo.altitude) self._latitude = xr.DataArray(latitude, coords=[('mjd', self._ds.mjd.values)]) self._longitude = xr.DataArray(longitude, coords=[('mjd', self._ds.mjd.values)]) self._altitude = xr.DataArray(altitude, coords=[('mjd', self._ds.mjd.values)]) @property def latitude(self): if self._latitude is None: self._tangent_point() return self._latitude @property def longitude(self): if self._longitude is None: self._tangent_point() return self._longitude @property def altitude(self): if self._altitude is None: self._tangent_point() return self._altitude @property def sza(self): """ Calculate spectrograph / IR solar zenith angle. """ logging.debug('Calculating level1 solar zenith angles.') if self._sza is None: sza = [] if self.look_ecef.ndim == 3: # it is an ir dataset altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL) else: # it is a spectrograph dataset altitude = self.altitude for (mjd, lat, lon, height) in zip(self._exposuremjd.values, self.latitude.values, self.longitude.values, altitude.values): loc = EarthLocation(lat=lat, lon=lon, height=height) time = Time(mjd, format='mjd') sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc)) sza.append(90.0 - sun.alt.value) self._sza = xr.DataArray(sza, coords=[('mjd', self._ds.mjd.values)]) return self._sza @property def ssa(self): """ Calculate spectrograph / IR solar scattering angle. """ logging.debug('Calculating level1 solar scattering angles.') if self._ssa is None: ssa = [] if self.look_ecef.ndim == 3: # it is an ir dataset altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL) else: # it is a spectrograph dataset altitude = self.altitude for (mjd, pos, lat, lon, height) in zip( self._exposuremjd.values, self.position_ecef.values, self.latitude.values, self.longitude.values, altitude.values): loc = EarthLocation(lat=lat, lon=lon, height=height) time = Time(mjd, format='mjd') sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc)) sat = EarthLocation(*pos, unit='m') sat = sat.get_itrs(obstime=time).transform_to(AltAz(obstime=time, location=loc)) ssa.append(180.0 - sun.separation(sat).value) self._ssa = xr.DataArray(ssa, coords=[('mjd', self._ds.mjd.values)]) return self._ssa @property def saa(self): """ Calculate the solar azimuth angle at the OSIRIS tangent point. """ logging.debug('Calculating level1 solar azimuth angle.') if self._saa is None: saa = [] if self.look_ecef.ndim == 3: # it is an ir dataset altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL) else: # it is a spectrograph dataset altitude = self.altitude for (mjd, pos, lat, lon, height) in zip( self._exposuremjd.values, self.position_ecef, self.latitude.values, self.longitude.values, altitude.values): loc = EarthLocation(lat=lat, lon=lon, height=height) time = Time(mjd, format='mjd') sun = get_sun(time).transform_to(AltAz(obstime=time, location=loc)) saa.append(sun.az.value) self._saa = xr.DataArray(saa, coords=[('mjd', self._ds.mjd.values)]) return self._saa @property def satellite_azimuth_angle(self): """ Calculate spectrograph / IR look azimuth angle. """ logging.debug('Calculating level1 satellite look direction azimuth angle.') if self._satellite_azimuth_angle is None: sat_aa = [] if self.look_ecef.ndim == 3: # it is an ir dataset altitude = self.altitude.sel(pixel=IR_REFERENCE_PIXEL) else: # it is a spectrograph dataset altitude = self.altitude for (mjd, pos, lat, lon, height) in zip( self._exposuremjd.values, self.position_ecef.values, self.latitude.values, self.longitude.values, altitude.values): loc = EarthLocation(lat=lat, lon=lon, height=height) time = Time(mjd, format='mjd') sat = EarthLocation(*pos, unit='m') sat = sat.get_itrs(obstime=time).transform_to(AltAz(obstime=time, location=loc)) sat_aa.append((sat.az.value - 180) % 360.0) self._satellite_azimuth_angle = xr.DataArray(sat_aa, coords=[('mjd', self._ds.mjd.values)]) return self._satellite_azimuth_angle @property def apparent_solar_time(self): if self._apparent_solar_time is None: if self.look_ecef.ndim == 3: # it is an ir dataset latitude = self.latitude.sel(pixel=72) longitude = self.longitude.sel(pixel=72) else: # it is a spectrograph dataset latitude = self.latitude longitude = self.longitude self._apparent_solar_time = xr.apply_ufunc( apparent_solar_time, self._exposuremjd, latitude.values, longitude.values) return self._apparent_solar_time def monotonic_altitude_slice(self): if 'channel' in self._ds.data_vars: # it is an ir dataset raise (NotImplementedError('Monotonic altitude slice is not implemented for IR data.')) diffs = self.altitude.diff(dim='mjd') if self.altitude.values[-1] > self.altitude.values[0]: monotonic = diffs > 0 else: monotonic = diffs < 0 if monotonic.all(): return slice(None, None) else: first, last = None, None if not monotonic[0]: for (i, b) in enumerate(monotonic): if b: first = i break if not monotonic[-1]: for (i, b) in enumerate(reversed(monotonic)): if b: last = -i break if not monotonic.isel(mjd=slice(first, last)).all(): raise (ValueError('Scan altitudes cannot be made to monotonically increasing/decreasing')) logging.debug(f'Trimming from {slice(first, last)}') return slice(first, last)
[docs]def get_scannos_in_orbit_below_sza(orbit: int, sza: float, target_altitude: float = 25000): """ For a given orbit return an iterator of the scans whose tangent point is above a configurable horizon. Useful for retrieval code. :param Orbit orbit: The orbit class handling the data retrieval, specified by orbit number :param float sza: Reference solar zenith angle :param target_altitude: Altitude to take solar zenith angle reference from, default is 25000m """ try: attitude = open_level1_attitude(orbit=orbit, load_pointing=True) except LookupError: # No valid orbits return for scanno in attitude.ScanNumber.values: logging.debug(f'Checking scan {scanno} to see if it is below a sza of {sza}') try: spectrograph = open_level1_spectrograph(scanno=scanno) except LookupError: # No data for scan logging.debug(f'Scan {scanno} has no spectrograph data.') continue spectrograph['altitude'] = spectrograph.l1.altitude spectrograph['sza'] = spectrograph.l1.sza spectrograph = spectrograph.dropna(dim='mjd', subset=['altitude']).sortby('altitude') sza_at_target = spectrograph.swap_dims({'mjd': 'altitude'}).sza.sel(altitude=target_altitude, method='nearest') if sza_at_target < sza: logging.info(f'SZA for scan {scanno} is below {sza}, yielding scan.') yield scanno else: logging.info(f'SZA for scan {scanno} is not below {sza}, skipping scan.')