import numpy as np
from collections import namedtuple
from scipy.optimize import fsolve, least_squares
from osirisl1services.blurreddata import SAO2010, O3BDM
from osirisl1services.readlevel1 import open_level1_spectrograph
from osirisl1services.services import Level1Services
import logging
CENTRAL_WAVELENGTH = 335.0
SIGMA_TO_FWHM = np.sqrt(8.0 * np.log(2.0))
PSF_results = namedtuple('PSF_results', ['fwhm', 'fwhm_error', 'shift', 'shift_error'])
# If debugging, use faster to load solar and o3 interps
if logging.getLogger().getEffectiveLevel() == logging.DEBUG: # pragma: no cover
logging.warning('Using debug versions of the solar and o3 psf interp objects.')
solar = SAO2010(sigma=np.arange(.35, .52, 0.03))
o3 = O3BDM(sigma=np.arange(.35, .52, 0.03))
else:
solar = SAO2010()
o3 = O3BDM()
# Target function
[docs]
def model_sig_mu(sig, mu, p, wv):
"""Simple blur and shift model."""
n = p[0]
a = p[1]
low_order_poly = p[2:]
wave = wv + mu
# logging.debug(f'n:{n:3.2e} a:{a:3.2e} sig:{sig:3.2e} mu:{mu:3.2e} '
# f'poly:{np.array2string(low_order_poly, precision=3, max_line_width=120)}')
low_order_spectrum = np.polyval(low_order_poly, wave - CENTRAL_WAVELENGTH)
solar_spectrum = solar(sig, wave).flatten()
o3_spectrum = o3(sig, wave).flatten()
return low_order_spectrum + np.log(solar_spectrum * np.exp(-np.exp(n + a * wave) * o3_spectrum))
[docs]
class PointSpreadFunctionFit(object):
"""Class for optimizing the PSF parameters for an OSIRIS scan."""
min_alt = 40000
max_alt = 55000
def __init__(self, scanno, model=model_sig_mu):
self.scanno = scanno
self.model = model
self.fit_results = None
# Load data from scan
spectrograph = open_level1_spectrograph(scanno=scanno)
spectrograph['altitude'] = spectrograph.l1.altitude
# reindex to ascending altitiude
self.scan = spectrograph.swap_dims({'pixel': 'wavelength', 'mjd': 'altitude'}).sortby('altitude').data
[docs]
def fit(self, sigma=0.4, mu=0.02) -> namedtuple:
""" Fits point spread function to data for each spectrum between 40 and 55 KM
:param float sigma: initial guess for 1st degree polynomial coefficient
:param float mu: initial guess for 0th degree polynomial coefficient
"""
poly_n = 5
spectra = self.scan.sel(wavelength=slice(300, 361), altitude=slice(PointSpreadFunctionFit.min_alt,
PointSpreadFunctionFit.max_alt))
if spectra.sizes['altitude'] == 0:
logging.error(f'Scan {self.scanno} has no spectra within the altitude range. PSF parameters not found.')
raise LookupError(f'Scan {self.scanno} has no spectra within the PSF altitude range')
if spectra.sizes['altitude'] > 12:
logging.error(f'Scan {self.scanno} has too many altitudes in PSF range. (Not a normal scan orbit).')
raise LookupError(f'Scan {self.scanno} has too many spectra in PSF altitude range.')
logging.info(f"Number of altitudes used in PSF fit for scan {self.scanno}: {spectra.sizes['altitude']}")
# Create the initial guesses for the c, n, and a parameters for each altitude
# and create a scaling factor array for the parameters
# c: absolute brightness of the spectra
# n: first guess neutral density parameter
# a: first guess ozone cross-sectional parameter
params = np.array([])
x_scale = np.array([])
for _, spectrum in spectra.groupby('altitude'):
spectrum = spectrum.dropna(dim='wavelength')
rad305 = spectrum.sel(wavelength=305.3, method='nearest').item()
w305 = spectrum.sel(wavelength=305.3, method='nearest').wavelength.item()
rad328 = spectrum.sel(wavelength=328.0, method='nearest').item()
w328 = spectrum.sel(wavelength=328.0, method='nearest').wavelength.item()
def n0a0(guess):
n, a = guess
return [np.log(np.log(c0 / rad305 * solar(.4, w305).item()) / o3(.4, w305).item()) - n - w305 * a,
np.log(np.log(c0 / rad328 * solar(.4, w328).item()) / o3(.4, w328).item()) - n - w328 * a]
c0 = spectrum.sel(wavelength=355, method='nearest').item() / solar(.4, 355).item()
(n0, a0), infodict, ier, errmsg = fsolve(n0a0, np.array([20.0, .08]), full_output=True)
logging.info(f"Finding the PSF initial n and a guesses took {infodict['nfev']} iterations")
logging.info(errmsg)
logging.debug(f'Altitude {spectrum.altitude.values:4.0f}m: n0: {n0:4.3f}, a0: {a0:4.3f}')
x = spectrum.wavelength - CENTRAL_WAVELENGTH
y = np.log(spectrum.values) \
- self.model(0.4, 0.0, np.r_[[n0, a0], np.zeros(poly_n + 1)], spectrum.wavelength)
p = np.polyfit(x, y, poly_n)
logging.debug(f'PSF initial poly: {np.array2string(p, precision=3, max_line_width=120)}')
params = np.r_[params, [n0, a0], p]
x_scale = np.r_[x_scale, [4.5e-1, 1.44e-3, 1.84e-8, 4.7e-7, 1.18e-5, 2.8e-4, 6.1e-3, 9.6e-2]]
# Vector of the parameters to fit
# It contains all the per image parameters of the problem, and the per scan parameters
params = np.r_[sigma, mu, params]
x_scale = np.r_[0.2, 0.24, x_scale]
# Vectors of the bounds, only the blur and shift are constrained
bounds = (np.ones_like(params) * -np.inf, np.ones_like(params) * np.inf)
bounds[0][0], bounds[1][0] = 0.35, 0.6
bounds[0][1], bounds[1][1] = -0.1, 0.15 # During the transient state
# Cost function of the fit, compare it to the previous example.
def errfunc(p, wv, os):
return np.concatenate(
list(self.model(p[0], p[1], p[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5], wv[~np.isnan(im)])
- np.log(im[~np.isnan(im)]) for i, im in enumerate(os))).ravel()
# Perform the least squares fit, adjusting the verbosity according to the desired logging level.
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
least_squares_verbose = 2
elif logging.getLogger().getEffectiveLevel() == logging.INFO:
least_squares_verbose = 1
else:
least_squares_verbose = 0
self.fit_results = least_squares(
fun=errfunc, x0=params, bounds=bounds, x_scale=x_scale, verbose=least_squares_verbose,
loss='cauchy', f_scale=0.03,
args=(spectra.sel(wavelength=slice(306, 361)).wavelength.values,
spectra.sel(wavelength=slice(306, 361)).values))
if not self.fit_results.success or any(self.fit_results.active_mask):
logging.error('Optimal PSF parameters not found.')
raise RuntimeError('Optimal PSF parameters not found.')
logging.info(f"Finding the PSF took {self.fit_results.nfev} iterations")
logging.info(self.fit_results.message)
# Log the results
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.debug(f'PSF final Sigma: {self.fit_results.x[0]:4.3f}, final Mu: {self.fit_results.x[1]:5.4f}')
for i, (_, spectrum) in enumerate(spectra.groupby('altitude')):
params = self.fit_results.x[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5]
logging.debug(f'Altitude {spectrum.altitude.values:4.0f}m: n: {params[0]:4.3f}, a: {params[1]:4.3f}')
logging.debug(f'PSF final poly: {np.array2string(params[2:], precision=3, max_line_width=120)}')
j = self.fit_results.jac
pcov = np.linalg.inv(j.T.dot(j)) * (self.fit_results.fun ** 2).mean()
perr = np.sqrt(np.diag(pcov))
s_sq_mean = sum(self.fit_results.fun ** 2) / len(self.fit_results.fun)
logging.info(f'PSF details for {self.scanno}: s_sq_mean={s_sq_mean:4.3e}, '
f'sigma={self.fit_results.x[0]:4.3f}±{perr[0]:4.3f}, '
f'mu={self.fit_results.x[1]:5.4f}±{perr[1]:5.4f}')
return PSF_results(self.fit_results.x[0] * SIGMA_TO_FWHM, perr[0] * SIGMA_TO_FWHM,
self.fit_results.x[1], perr[1])
[docs]
def plot(self): # pragma: no cover
""" Plots modelled psf fitting with spectrum data """
import matplotlib.pyplot as plt
spectra = self.scan.sel(wavelength=slice(306, 361), altitude=slice(PointSpreadFunctionFit.min_alt,
PointSpreadFunctionFit.max_alt))
poly_n = (len(self.fit_results.x) - 2) // spectra.sizes['altitude'] - 3
sigma = self.fit_results.x[0]
mu = self.fit_results.x[1]
fwhm = sigma * SIGMA_TO_FWHM
for i, (alt, spectrum) in enumerate(spectra.groupby('altitude')):
params = self.fit_results.x[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5]
modelled = [self.model(sigma, mu, params, wv) for wv in spectrum.wavelength]
modelled_blur = [self.model(sigma * 1.0, mu - 0.05, params, wv) for wv in spectrum.wavelength]
modelled_sharp = [self.model(sigma / 1.0, mu + 0.05, params, wv) for wv in spectrum.wavelength]
fig, axs = plt.subplots(2, 1, figsize=(10, 10))
np.log(spectrum).plot.line('k.', ax=axs[0])
axs[0].plot(spectrum.wavelength, modelled,
spectrum.wavelength, modelled_blur,
spectrum.wavelength, modelled_sharp)
axs[0].set_title(f'Scan: {self.scanno}, Polynomial Order: {poly_n}, FWHM: {fwhm:.3}')
rsq = ((np.log(spectrum) - modelled) ** 2).sum().values.take(0)
rsq_blur = ((np.log(spectrum) - modelled_blur) ** 2).sum().values.take(0)
rsq_sharp = ((np.log(spectrum) - modelled_sharp) ** 2).sum().values.take(0)
axs[1].plot(spectrum.wavelength, np.log(spectrum) - modelled,
spectrum.wavelength, np.log(spectrum) - modelled_blur,
spectrum.wavelength, np.log(spectrum) - modelled_sharp)
axs[1].axhline(0, color='k', linestyle='--')
axs[1].set_title(f'Residual Rsq: {rsq:.4}, Blurred Rsq: {rsq_blur:.4}, Sharp Rsq: {rsq_sharp:.4}, ')
plt.ylim(-0.06, 0.06)
plt.show()