Source code for psf_fit

import numpy as np
from collections import namedtuple
from scipy.optimize import fsolve, least_squares
from osirisl1services.blurreddata import SAO2010, O3BDM
from osirisl1services.readlevel1 import open_level1_spectrograph
from osirisl1services.services import Level1Services
import logging

CENTRAL_WAVELENGTH = 335.0
SIGMA_TO_FWHM = np.sqrt(8.0 * np.log(2.0))

PSF_results = namedtuple('PSF_results', ['fwhm', 'fwhm_error', 'shift', 'shift_error'])

# If debugging, use faster to load solar and o3 interps
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:  # pragma: no cover
    logging.warning('Using debug versions of the solar and o3 psf interp objects.')
    solar = SAO2010(sigma=np.arange(.35, .52, 0.03))
    o3 = O3BDM(sigma=np.arange(.35, .52, 0.03))
else:
    solar = SAO2010()
    o3 = O3BDM()


# Target function
[docs]def model_sig_mu(sig, mu, p, wv): """Simple blur and shift model.""" n = p[0] a = p[1] low_order_poly = p[2:] wave = wv + mu # logging.debug(f'n:{n:3.2e} a:{a:3.2e} sig:{sig:3.2e} mu:{mu:3.2e} ' # f'poly:{np.array2string(low_order_poly, precision=3, max_line_width=120)}') low_order_spectrum = np.polyval(low_order_poly, wave - CENTRAL_WAVELENGTH) solar_spectrum = solar(sig, wave).flatten() o3_spectrum = o3(sig, wave).flatten() return low_order_spectrum + np.log(solar_spectrum * np.exp(-np.exp(n + a * wave) * o3_spectrum))
[docs]class PointSpreadFunctionFit(object): """Class for optimizing the PSF parameters for an OSIRIS scan.""" min_alt = 40000 max_alt = 55000 def __init__(self, scanno, model=model_sig_mu): self.scanno = scanno self.model = model self.fit_results = None # Load data from scan spectrograph = open_level1_spectrograph(scanno=scanno) spectrograph['altitude'] = spectrograph.l1.altitude # reindex to ascending altitiude self.scan = spectrograph.swap_dims({'pixel': 'wavelength', 'mjd': 'altitude'}).sortby('altitude').data
[docs] def fit(self, sigma=0.4, mu=0.02) -> namedtuple: """ Fits point spread function to data for each spectrum between 40 and 55 KM :param float sigma: initial guess for 1st degree polynomial coefficient :param float mu: initial guess for 0th degree polynomial coefficient """ poly_n = 5 spectra = self.scan.sel(wavelength=slice(300, 361), altitude=slice(PointSpreadFunctionFit.min_alt, PointSpreadFunctionFit.max_alt)) if spectra.sizes['altitude'] == 0: logging.error(f'Scan {self.scanno} has no spectra within the altitude range. PSF parameters not found.') raise LookupError(f'Scan {self.scanno} has no spectra within the PSF altitude range') if spectra.sizes['altitude'] > 12: logging.error(f'Scan {self.scanno} has too many altitudes in PSF range. (Not a normal scan orbit).') raise LookupError(f'Scan {self.scanno} has too many spectra in PSF altitude range.') logging.info(f"Number of altitudes used in PSF fit for scan {self.scanno}: {spectra.sizes['altitude']}") # Create the initial guesses for the c, n, and a parameters for each altitude # and create a scaling factor array for the parameters # c: absolute brightness of the spectra # n: first guess neutral density parameter # a: first guess ozone cross-sectional parameter params = np.array([]) x_scale = np.array([]) for _, spectrum in spectra.groupby('altitude'): spectrum = spectrum.dropna(dim='wavelength') rad305 = spectrum.sel(wavelength=305.3, method='nearest').item() w305 = spectrum.sel(wavelength=305.3, method='nearest').wavelength.item() rad328 = spectrum.sel(wavelength=328.0, method='nearest').item() w328 = spectrum.sel(wavelength=328.0, method='nearest').wavelength.item() def n0a0(guess): n, a = guess return [np.log(np.log(c0 / rad305 * solar(.4, w305).item()) / o3(.4, w305).item()) - n - w305 * a, np.log(np.log(c0 / rad328 * solar(.4, w328).item()) / o3(.4, w328).item()) - n - w328 * a] c0 = spectrum.sel(wavelength=355, method='nearest').item() / solar(.4, 355).item() (n0, a0), infodict, ier, errmsg = fsolve(n0a0, np.array([20.0, .08]), full_output=True) logging.info(f"Finding the PSF initial n and a guesses took {infodict['nfev']} iterations") logging.info(errmsg) logging.debug(f'Altitude {spectrum.altitude.values:4.0f}m: n0: {n0:4.3f}, a0: {a0:4.3f}') x = spectrum.wavelength - CENTRAL_WAVELENGTH y = np.log(spectrum.values) \ - self.model(0.4, 0.0, np.r_[[n0, a0], np.zeros(poly_n + 1)], spectrum.wavelength) p = np.polyfit(x, y, poly_n) logging.debug(f'PSF initial poly: {np.array2string(p, precision=3, max_line_width=120)}') params = np.r_[params, [n0, a0], p] x_scale = np.r_[x_scale, [4.5e-1, 1.44e-3, 1.84e-8, 4.7e-7, 1.18e-5, 2.8e-4, 6.1e-3, 9.6e-2]] # Vector of the parameters to fit # It contains all the per image parameters of the problem, and the per scan parameters params = np.r_[sigma, mu, params] x_scale = np.r_[0.2, 0.24, x_scale] # Vectors of the bounds, only the blur and shift are constrained bounds = (np.ones_like(params) * -np.inf, np.ones_like(params) * np.inf) bounds[0][0], bounds[1][0] = 0.35, 0.6 bounds[0][1], bounds[1][1] = -0.1, 0.15 # During the transient state # Cost function of the fit, compare it to the previous example. def errfunc(p, wv, os): return np.concatenate( list(self.model(p[0], p[1], p[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5], wv[~np.isnan(im)]) - np.log(im[~np.isnan(im)]) for i, im in enumerate(os))).ravel() # Perform the least squares fit, adjusting the verbosity according to the desired logging level. if logging.getLogger().getEffectiveLevel() == logging.DEBUG: least_squares_verbose = 2 elif logging.getLogger().getEffectiveLevel() == logging.INFO: least_squares_verbose = 1 else: least_squares_verbose = 0 self.fit_results = least_squares( fun=errfunc, x0=params, bounds=bounds, x_scale=x_scale, verbose=least_squares_verbose, loss='cauchy', f_scale=0.03, args=(spectra.sel(wavelength=slice(306, 361)).wavelength.values, spectra.sel(wavelength=slice(306, 361)).values)) if not self.fit_results.success or any(self.fit_results.active_mask): logging.error('Optimal PSF parameters not found.') raise RuntimeError('Optimal PSF parameters not found.') logging.info(f"Finding the PSF took {self.fit_results.nfev} iterations") logging.info(self.fit_results.message) # Log the results if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.debug(f'PSF final Sigma: {self.fit_results.x[0]:4.3f}, final Mu: {self.fit_results.x[1]:5.4f}') for i, (_, spectrum) in enumerate(spectra.groupby('altitude')): params = self.fit_results.x[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5] logging.debug(f'Altitude {spectrum.altitude.values:4.0f}m: n: {params[0]:4.3f}, a: {params[1]:4.3f}') logging.debug(f'PSF final poly: {np.array2string(params[2:], precision=3, max_line_width=120)}') j = self.fit_results.jac pcov = np.linalg.inv(j.T.dot(j)) * (self.fit_results.fun ** 2).mean() perr = np.sqrt(np.diag(pcov)) s_sq_mean = sum(self.fit_results.fun ** 2) / len(self.fit_results.fun) logging.info(f'PSF details for {self.scanno}: s_sq_mean={s_sq_mean:4.3e}, ' f'sigma={self.fit_results.x[0]:4.3f}±{perr[0]:4.3f}, ' f'mu={self.fit_results.x[1]:5.4f}±{perr[1]:5.4f}') return PSF_results(self.fit_results.x[0] * SIGMA_TO_FWHM, perr[0] * SIGMA_TO_FWHM, self.fit_results.x[1], perr[1])
[docs] def plot(self): # pragma: no cover """ Plots modelled psf fitting with spectrum data """ import matplotlib.pyplot as plt spectra = self.scan.sel(wavelength=slice(306, 361), altitude=slice(PointSpreadFunctionFit.min_alt, PointSpreadFunctionFit.max_alt)) poly_n = (len(self.fit_results.x) - 2) // spectra.sizes['altitude'] - 3 sigma = self.fit_results.x[0] mu = self.fit_results.x[1] fwhm = sigma * SIGMA_TO_FWHM for i, (alt, spectrum) in enumerate(spectra.groupby('altitude')): params = self.fit_results.x[(poly_n + 3) * i + 2:(poly_n + 3) * i + poly_n + 5] modelled = [self.model(sigma, mu, params, wv) for wv in spectrum.wavelength] modelled_blur = [self.model(sigma * 1.0, mu - 0.05, params, wv) for wv in spectrum.wavelength] modelled_sharp = [self.model(sigma / 1.0, mu + 0.05, params, wv) for wv in spectrum.wavelength] fig, axs = plt.subplots(2, 1, figsize=(10, 10)) np.log(spectrum).plot.line('k.', ax=axs[0]) axs[0].plot(spectrum.wavelength, modelled, spectrum.wavelength, modelled_blur, spectrum.wavelength, modelled_sharp) axs[0].set_title(f'Scan: {self.scanno}, Polynomial Order: {poly_n}, FWHM: {fwhm:.3}') rsq = ((np.log(spectrum) - modelled) ** 2).sum().values.take(0) rsq_blur = ((np.log(spectrum) - modelled_blur) ** 2).sum().values.take(0) rsq_sharp = ((np.log(spectrum) - modelled_sharp) ** 2).sum().values.take(0) axs[1].plot(spectrum.wavelength, np.log(spectrum) - modelled, spectrum.wavelength, np.log(spectrum) - modelled_blur, spectrum.wavelength, np.log(spectrum) - modelled_sharp) axs[1].axhline(0, color='k', linestyle='--') axs[1].set_title(f'Residual Rsq: {rsq:.4}, Blurred Rsq: {rsq_blur:.4}, Sharp Rsq: {rsq_sharp:.4}, ') plt.ylim(-0.06, 0.06) plt.show()