Source code for LOTUS_regression.regression

import statsmodels.api as sm
import numpy as np
from collections import namedtuple
import xarray as xr
import pandas as pd
from copy import copy
import statsmodels


def _corrected_ar1_covariance(sigma, gaps, rho):
    """
    Calculates the corrected covariance matrix accounting for AR1 structure, this is the Prais and Winsten covariance
    structure with a modification by Savin and White (1978) to account for gaps in the data.

    Parameters
    ----------
    sigma : np.array
        Length n, square root of the diagonal elements of the uncorrected covariance matrix
    gaps : Iterable of tuples
        Each element of gaps must be a tuple containing a length and index field.  index is the location where the gap
        occurs and length is the number of elements in the gap
    rho : float
        AR1 autocorrelation parameter

    Returns
    -------
    np.ndarray
        Covariance matrix (n, n) which accounts for the AR1 structure

    """
    G = np.zeros((len(sigma), len(sigma)))

    for i in range(len(sigma)):
        if i == 0:
            G[i, i] = np.sqrt(1-rho**2)
        else:
            G[i, i] = 1
            G[i, i-1] = -rho

    if gaps is not None:
        for gap in gaps:
            index = gap.index
            if index >= len(sigma):
                break
            m = gap.length

            g = np.sqrt((1-rho**2) / (1-rho**(2*m+2)))

            G[index, index] = g
            G[index, index-1] = -g*rho**(m+1)

    covar = np.linalg.inv(G.T.dot(G))
    covar *= np.outer(sigma, sigma)

    return covar


def _remove_nans_and_find_gaps(X, Y, sigma):
    """
    Preprocesses the data by removing NaN's and in the process finds gaps in the data.

    Parameters
    ----------
    X : np.ndarray
        (nsamples, npredictors)  predictor matrix
    Y : np.array
        (nsamples) observations
    sigma : np.array
        (nsamples) Square root of the diagonal elements of the covariance matrix for Y

    Returns
    -------
    X : np.ndarray

    Y : np.array

    sigma : np.array

    gaps : Iterable tuple

    good_index :


    """
    Gap = namedtuple('Gap', ['length', 'index'])

    good_data = ~np.isnan(Y) & (sigma > 0)
    Y_fixed = np.zeros((np.sum(good_data)))

    gaps = []

    # Y is a little tricky because the index constantly changes, have i go over Y and i_yfix go over Y_fixed
    i = 0
    i_yfix = 0
    while i < len(Y):
        if np.isnan(Y[i]) or (sigma[i] == 0):
            gap_start = i

            while i < len(Y) and (np.isnan(Y[i]) or (sigma[i] == 0)):
                i += 1
            gap_end = i

            gaps.append(Gap(gap_end - gap_start, i_yfix))
        if i_yfix >= len(Y_fixed):
            break
        Y_fixed[i_yfix] = Y[i]
        i += 1
        i_yfix += 1

    return X[good_data, :], Y_fixed, sigma[good_data], gaps, good_data


def _heteroscedasticity_fit_values(num_resid, seasonal_harmonics=(3, 4, 6, 12), extra_predictors=None,
                                   merged_flag=None, treat_merged_periods_differently=True):
    """

    Parameters
    ----------
    num_resid : int
        number of residuals
    seasonal_harmonics : tuple, optional
        Default is (3,4,6,12), these are the harmonics used to create the seasonal predictors.  sins and cosines are used
        as predictor
    extra_predictors : np.ndarray
        (nsamples, x) adds extra predictors to be used in addition to the sins and cosines.

    merged_flag : np.array, optional
        Flag indicating different 'modes' of the merged dataset. For example, this flag is commonly used to describe
        different instaument time periods in a merged dataset.
    """

    month_index = np.arange(0, num_resid)

    if merged_flag is None:
        unique_modes = [0]
        merged_flag = np.zeros(num_resid)
    else:
        unique_modes = np.unique(merged_flag)

    if treat_merged_periods_differently:
        X = np.zeros((num_resid, 2 * len(seasonal_harmonics) * len(unique_modes)))
        for idy, mode in enumerate(unique_modes):
            mask = (merged_flag == mode).astype(int)
            for idx, harmonic in enumerate(seasonal_harmonics):
                X[:, 2*idx + 2 * len(seasonal_harmonics) * idy] = np.cos(2*np.pi*month_index / harmonic) * mask
                X[:, 2*idx + 2 * len(seasonal_harmonics) * idy + 1] = np.sin(2*np.pi*month_index / harmonic) * mask

        if len(unique_modes) > 1:
            # Multiple modes, add constants to allow for varying weights between modes
            X_constants = np.zeros((num_resid, len(unique_modes)))
            for idy, mode in enumerate(unique_modes):
                mask = (merged_flag == mode).astype(int)

                X_constants[:, idy] = mask

            X = np.hstack((X, X_constants))
    else:
        if np.max(unique_modes) == 0:
            num_bits = 1
        else:
            num_bits = int(np.floor(np.log2(np.max(unique_modes)))) + 1
        X = np.zeros((num_resid, 2 * len(seasonal_harmonics) * num_bits))

        for bit in range(num_bits):
            mask = ((merged_flag.astype(int) & 2**bit) > 0).astype(int)
            for idx, harmonic in enumerate(seasonal_harmonics):
                X[:, 2*idx + 2 * len(seasonal_harmonics) * bit] = np.cos(2*np.pi*month_index / harmonic) * mask
                X[:, 2*idx + 2 * len(seasonal_harmonics) * bit + 1] = np.sin(2*np.pi*month_index / harmonic) * mask

        if len(unique_modes) > 1:
            # Multiple modes, add constants to allow for varying weights between modes
            X_constants = np.zeros((num_resid, num_bits))
            for bit in range(num_bits):
                mask = ((merged_flag.astype(int) & 2 ** bit) > 0).astype(int)

                X_constants[:, bit] = mask
            X = np.hstack((X, X_constants))

    if extra_predictors is not None:
        X = np.hstack((X, extra_predictors))

    return X


def _heteroscedasticity_correction_factors(residual, fit_functions, log_space=False, damping=0.5):
    """
    Finds the heteroscedasticity correction factors outlined in Damadeo et al. 2014.  This is done by fitting
    log(residual^2) to a set of predictors. Ideally the residuals should be completely random and thus these fit values
    are 0, however that is not usually the case in practice.  The calculated correction factors can vary greatly based
    on the set of predictors used.

    Parameters
    ----------
    residual : np.array
        (nsamples) residuals of the fit.  Note that these are the residuals under the transformed variables of the GLS,
        i.e., including the autocorrelation correction done.

    fit_functions : np.ndarray

    Returns
    -------
    Multiplicative correction factors to be applied to the square root of the diagonal elements of the covariance
    matrix.

    """
    Y = residual**2
    if log_space:
        Y = np.log(Y)
    model = sm.OLS(Y, fit_functions)

    results = model.fit()

    f = results.fittedvalues
    if log_space:
        f = np.exp(f)

    correction_factors = (np.sqrt(np.abs(f)) - 1)*damping + 1

    # Sometimes we can get values really close to 0, so dont let the weights change by more than 2 orders of magnitude
    correction_factors[correction_factors <= 1e-2] = 1e-2
    correction_factors[correction_factors >= 1e2] = 1e2

    return correction_factors


[docs]def mzm_regression(X, Y, sigma=None, tolerance=1e-2, max_iter=50, do_autocorrelation=True, do_heteroscedasticity=False, extra_heteroscedasticity=None, heteroscedasticity_merged_flag=None, seasonal_harmonics=(3, 4, 6, 12), constrain_ilt_gap=False, ilt_predictor_index_dict=None): """ Performs the regression for a single bin. Parameters ---------- X : np.ndarray (nsamples, npredictors) Array of predictor basis functions Y : np.array (nsamples) Observations sigma : np.array (nsamples) Square root of the diagonal elements of the covariance matrix for Y tolerance : float, optional Iterations stop when the relative difference in the AR1 coefficient is less than this threshold. Default 1e-2 max_iter : int, optional Maximum number of iterations to perform. Default 50 do_autocorrelation : bool, optional If true, do the AR1 autocorrelation correction on the covariance matrix. Default True. do_heteroscedasticity : bool, optional If True, do the heteroscedasticity correction on the covariance matrix. Default False. extra_heteroscedasticity : np.ndarray, optional (nsamples, nextrapredictors) Extra predictor functions to use in the heteroscedasticity correction. heteroscedasticity_merged_flag : np.ndarray, optional (nsamples) A flag indicating time periods that should be treated independently in the heteroscedasticity correction. E.g. this could be something like [0, 0, 0, 0, 2, 2, 2, 1, 1, 1] which would create 3 independant time periods where the heteroscedasticity correction is applied. seasonal_harmonics : Iterable Float, optional. Default (3, 4, 6, 12) The monthly harmonics to use in the heteroscedasticity correction. constrain_ilt_gap : bool, optional. Default False. If true then a constraint is added so that the ILT terms in the gap period enforce continuity. This must be set in conjunction with ilt_predictor_index_dict ilt_predictor_index_dict : dict, optional. Default None. If using constrain_ilt_gap, this must be a dictionary {predictor_name: index_in X} which contains the indicies of the predictors 'gap_cons', 'pre_const', 'post_const', 'gap_linear', 'linear_pre', 'linear_post' Returns ------- results : dict a dictionary of outputs with keys: ``gls_results`` The raw regression output. This is an instance of `RegressionResults` which is documented at http://www.statsmodels.org/dev/generated/statsmodels.regression.linear_model.RegressionResults.html ``residual`` Residuals of the fit in the original coordinate system. ``transformed_residuals`` Residuals of the fit in the GLS transformed coordinates. ``autocorrelation`` The AR1 correlation constant. ``numiter`` Number of iterative steps performed. ``covariance`` Calculated covariance of Y that is input to the GLS model. """ # If we have no weights then still do a weighted regression but use sigma=1 if sigma is None: sigma = np.ones_like(Y, dtype=float) # Set up some output variables, we want the output to be the same shape as the input but we have to preprocess the # input and remove NaN's residuals = np.ones_like(Y)*np.nan fit_values = np.ones_like(Y)*np.nan transformed_residuals_out = np.ones_like(Y)*np.nan sigma_out = np.ones_like(Y)*np.nan # Do some preprocessing X, Y, sigma, gaps, good_index = _remove_nans_and_find_gaps(X, Y, sigma) if extra_heteroscedasticity is not None: extra_heteroscedasticity = extra_heteroscedasticity[good_index] if heteroscedasticity_merged_flag is not None: heteroscedasticity_merged_flag = heteroscedasticity_merged_flag[good_index] if do_heteroscedasticity: heteroscedasticity_X = _heteroscedasticity_fit_values(len(Y), extra_predictors=extra_heteroscedasticity, merged_flag=heteroscedasticity_merged_flag, treat_merged_periods_differently=True, seasonal_harmonics=seasonal_harmonics) # Initial covariance matrix for the GLS with diagonal structure covar = np.diag(sigma**2) # Set up some variables for convergence testing, funny value but |rho| has to be less than one so this should be # okay rho_prior = -10000 # Output output = dict() # Main loop for i in range(max_iter): # GLS Model model = sm.GLS(Y, X, covar) results = model.fit() if constrain_ilt_gap: model_glm = sm.GLM(model.wendog, model.wexog) gap_const = model.exog[:, ilt_predictor_index_dict['gap_cons']] start_x_index = np.nonzero(gap_const)[0][0] end_x_index = np.nonzero(gap_const)[0][-1] constraint_R = np.zeros((2, X.shape[1])) constraint_q = np.zeros(2) # first continuity constraint is at start index, gap_const = pre_const constraint_R[0, ilt_predictor_index_dict['gap_cons']] = 1 constraint_R[0, ilt_predictor_index_dict['pre_const']] = -1 constraint_q[0] = 0 # Second continuity constraint is at the end index, gap_const + gap_linear = post_const constraint_R[1, ilt_predictor_index_dict['gap_cons']] = 1 constraint_R[1, ilt_predictor_index_dict['gap_linear']] = model.exog[end_x_index, ilt_predictor_index_dict['gap_linear']] constraint_R[1, ilt_predictor_index_dict['post_const']] = -1 constraint_q[1] = 0 results = model_glm.fit_constrained((constraint_R, constraint_q)) results.resid = (model.wendog - results.fittedvalues) # Find the autocorrelation coefficient, if we are not doing autocorrelation set rho to 0 so that later our # variable transformations are still valid if do_autocorrelation: rho, _ = sm.regression.yule_walker(results.resid, order=1) else: rho = 0 # Find the residuals in the "transformed" GLS units. transformed_residuals = np.dot(results.resid, model.cholsigmainv) if do_heteroscedasticity and i > 0: correction_factors = _heteroscedasticity_correction_factors(transformed_residuals, heteroscedasticity_X, log_space=True) sigma *= correction_factors if not do_autocorrelation: # If we arent doing autocorrelation we have to reset the covariance matrix, if we are this will be done # in the next step covar = np.diag(sigma**2) if do_autocorrelation: covar = _corrected_ar1_covariance(sigma, gaps, rho) if not do_autocorrelation and not do_heteroscedasticity: # There is nothing here to iterate, break # If converged we can stop if np.abs((rho - rho_prior)) < tolerance and i > 0: if not do_heteroscedasticity: break else: if np.linalg.norm(correction_factors - 1) < tolerance: break else: rho_prior = rho residuals[good_index] = results.resid fit_values[good_index] = results.fittedvalues output['gls_results'] = results output['residual'] = residuals if do_autocorrelation: transformed_residuals_out[good_index] = transformed_residuals output['transformed_residuals'] = transformed_residuals_out output['autocorrelation'] = rho sigma_out[good_index] = sigma output['corrected_weights'] = sigma_out output['numiter'] = i output['fit_values'] = fit_values output['covariance'] = covar return output
[docs]def regress_all_bins(predictors, mzm_data, time_field='time', debug=False, sigma=None, post_fit_trend_start=None, include_monthly_fits=False, return_raw_results=False, constrain_ilt_gap=False, **kwargs): """ Performs the regression for a dataset in all bins. Parameters ---------- predictors : pd.Dataframe Dataframe of predictors to use in the regression. Index should be a time field. mzm_data : xr.DataArray DataArray containing the monthly zonal mean data in a variety of bins. The data should be three dimensional with one dimension representing time. The other two dimensions are typically latitude and a vertical coordinate. time_field : string Name of the time field in the mzm_data structure sigma : xr.DataArray, optional. Default None If not None then the regression is weighted as if sigma is the standard deviation of mzm_data. Should be in the same format as mzm_data. post_fit_trend_start : datetimelike, optional. Default None If set to a datetime like object (example: '2000-01-01') then a linear trend is post fit to the residuals with the specified start date. If this is set you should not include a linear term in the predictors or the results will not be valid constrain_ilt_gap: bool, optional. Default False If True then a constraint is added to the regression so that the fit terms in the gap period maintain continunity when doing ILT trends. The predictors must have keys 'gap_cons', 'post_const', 'pre_const', 'linear_pre', 'linear_post', and 'gap_linear' for this option to work. This is the standard case when using the 'predictors_baseline_ilt_linear_gap' file. kwargs Other arguments passed to mzm_regression Returns ------- xr.Dataset """ mzm_data = mzm_data.rename({time_field, 'time'}) mzm_data = mzm_data.reindex(time=pd.date_range(mzm_data.time.values[0], mzm_data.time.values[-1], freq=pd.DateOffset(months=1)), tolerance=pd.Timedelta(days=1), method='nearest') min_time = mzm_data.time.values[0] max_time = mzm_data.time.values[-1] min_time = pd.to_datetime(min_time) if post_fit_trend_start is not None: post_fit_time_index = mzm_data.time.values >= np.datetime64(post_fit_trend_start) t = mzm_data.time.values[post_fit_time_index] # Doesn't account for leap years X_post_fit_trend = (t - t[0])/10 / np.timedelta64(365, 'D') X_post_fit_trend -= np.nanmean(X_post_fit_trend) X_post_fit_trend = X_post_fit_trend.reshape((-1, 1)) X_post_fit_trend = sm.add_constant(X_post_fit_trend, prepend=False) if min_time.day != 1: min_time -= pd.DateOffset(months=1) if isinstance(predictors.index, pd.DatetimeIndex): tstamp = pd.to_datetime(predictors.index) else: tstamp = predictors.index.to_timestamp() predictors = predictors[(tstamp >= min_time) & (tstamp <= max_time)] pred_list = list(predictors.columns.values) seasonal_comp_info = dict() if include_monthly_fits: # Find the predictors with seasonal components seasonal_comps = [a for a in pred_list if 'sin0' in a or 'cos0' in a] seasonal_comps = list(set([s[:-5] for s in seasonal_comps])) days = pd.date_range('1997-01-01', '1997-12-01', freq='MS') day_factor = (days.dayofyear - 1) / 365.25 for seasonal_comp in seasonal_comps: num_comp = int((len([a for a in pred_list if seasonal_comp in a]) - 1) / 2) seasonal_comp_info[seasonal_comp] = dict() seasonal_comp_info[seasonal_comp]['num_component'] = num_comp seasonal_comp_info[seasonal_comp]['pred_matrix'] = np.zeros((len(pred_list), len(day_factor))) for idx in range(num_comp): cos_idx = pred_list.index(seasonal_comp + '_cos{}'.format(idx)) sin_idx = pred_list.index(seasonal_comp + '_sin{}'.format(idx)) seasonal_comp_info[seasonal_comp]['pred_matrix'][cos_idx, :] = np.cos(2*np.pi*day_factor * idx+1) seasonal_comp_info[seasonal_comp]['pred_matrix'][sin_idx, :] = np.sin(2*np.pi*day_factor * idx+1) const_idx = pred_list.index(seasonal_comp) seasonal_comp_info[seasonal_comp]['pred_matrix'][const_idx, :] = 1 if len(predictors) != len(mzm_data.time.values): raise ValueError('Input predictors do not cover the full time period') # (nsamples, npredictors) matrix X = predictors.values coords = [] for c in mzm_data.coords.dims: if c != 'time': coords.append(c) assert(len(coords) <= 2) c_list = {c: mzm_data[c].values for c in coords} ret = xr.Dataset(coords=c_list) sized_nans = np.ones([len(mzm_data[c].values) for c in coords]) * np.nan if include_monthly_fits: ret.coords['month'] = np.arange(1, 12.5, 1).astype(int) for pred in pred_list: ret[pred] = (coords, copy(sized_nans)) ret[pred + "_std"] = (coords, copy(sized_nans)) if pred in seasonal_comp_info: ret[pred + '_monthly'] = (np.concatenate((coords, ['month'])), np.repeat(sized_nans[:, :, np.newaxis], 12, axis=2)) ret[pred + '_monthly_std'] = (np.concatenate((coords, ['month'])), np.repeat(sized_nans[:, :, np.newaxis], 12, axis=2)) if post_fit_trend_start is not None: ret['linear_post'] = (coords, copy(sized_nans)) ret['linear_post' + "_std"] = (coords, copy(sized_nans)) if return_raw_results: raw_results = np.empty((len(mzm_data[coords[0]].values), len(mzm_data[coords[1]].values)), dtype=statsmodels.regression.linear_model.RegressionResultsWrapper) if constrain_ilt_gap: necessary_predictors = ['gap_cons', 'pre_const', 'post_const', 'linear_pre', 'linear_post', 'gap_linear'] constrain_index_dict = dict() for pred in necessary_predictors: if pred not in pred_list: raise ValueError('When using constrain_ilt_gap the predictors must contain gap_cons, pre_const, post_const, linear_pre, linear_post, and gap_linear.') constrain_index_dict[pred] = pred_list.index(pred) else: constrain_index_dict = None for id_x, x in enumerate(mzm_data[coords[0]].values): for id_y, y in enumerate(mzm_data[coords[1]].values): sliced_data = mzm_data.loc[{coords[0]: x, coords[1]: y}] Y = sliced_data.values if sigma is not None: sigma_bin = sigma.loc[{coords[0]: x, coords[1]: y}].values if post_fit_trend_start is not None: sigma_post_fit = sigma_bin[post_fit_time_index] else: sigma_bin = None if post_fit_trend_start is not None: sigma_post_fit = None try: output = mzm_regression(X, Y, sigma=sigma_bin, constrain_ilt_gap=constrain_ilt_gap, ilt_predictor_index_dict=constrain_index_dict, **kwargs) std_error = np.sqrt(np.diag(output['gls_results'].cov_params())) if return_raw_results: raw_results[id_x, id_y] = output['gls_results'] for idx, col in enumerate(pred_list): ret[col][id_x, id_y] = output['gls_results'].params[idx] ret[col + '_std'][id_x, id_y] = std_error[idx] if col in seasonal_comp_info: ret[col + '_monthly'][id_x, id_y] = seasonal_comp_info[col]['pred_matrix'].T @ output['gls_results'].params ret[col + '_monthly_std'][id_x, id_y] = np.sqrt(np.diag(seasonal_comp_info[col][ 'pred_matrix'].T @ output['gls_results'].cov_params() @ seasonal_comp_info[col][ 'pred_matrix'])) if post_fit_trend_start is not None: Y_post_fit = output['residual'][post_fit_time_index] # If we did the initial fit with linear terms or the EESC we need to add these back in to the # residuals for idx, pred in enumerate(pred_list): if 'linear' in pred.lower() or 'eesc' in pred.lower(): contrib = output['gls_results'].params[idx] * X[post_fit_time_index, idx] Y_post_fit += contrib Y_post_fit -= np.nanmean(Y_post_fit) post_fit_output = mzm_regression(X_post_fit_trend, Y_post_fit, sigma_post_fit, do_autocorrelation=kwargs.get('do_autocorrelation', True), constrain_ilt_gap=constrain_ilt_gap, ilt_predictor_index_dict=constrain_index_dict) std_error = np.sqrt(np.diag(post_fit_output['gls_results'].cov_params())) ret['linear_post'][id_x, id_y] = post_fit_output['gls_results'].params[0] ret['linear_post_std'][id_x, id_y] = std_error[0] except Exception as e: if debug: print(e) if return_raw_results: return ret, raw_results else: return ret