dubfi.fluxes.util ================= .. py:module:: dubfi.fluxes.util .. autoapi-nested-parse:: Utility functions for flux inversion problem. .. codeauthor:: Valentin Bruch, DWD .. versionadded:: 0.1.0 (initial release) Functions --------- .. autoapisummary:: dubfi.fluxes.util.get_b_prior dubfi.fluxes.util.get_prior_cycling dubfi.fluxes.util.mix_b dubfi.fluxes.util.get_localization dubfi.fluxes.util.get_localization_sparse dubfi.fluxes.util.chunk_corr_std dubfi.fluxes.util.fill_chunks Module Contents --------------- .. py:function:: get_b_prior(flux_cat, config) Construct a priori B matrix from configuration. :param flux_cat: names of flux categories (as used in configuration), result will be aligned to this list. :type flux_cat: np.ndarray :param config: configuration :type config: dict :returns: **b_prior** -- prior uncertainty matrix (B) :rtype: np.ndarray .. py:function:: get_prior_cycling(ds, flux_cat, config) Construct a priori B matrix in cycling step from results of previous cycle. :param ds: output of previous cycle, coordinates must agree with provided coordinates. :type ds: xr.Dataset :param flux_cat: names of flux categories, result will be aligned to this list. :type flux_cat: array :param config: configuration :type config: dict :returns: * **s_prior** (*np.ndarray*) -- prior scaling factors for simpler Kalman filter, shape (flux_cat,) * **b_prior** (*np.ndarray*) -- error covariance matrix of s_prior, shape (flux_cat, flux_cat) * **s_prior_norm_pref** (*np.ndarray*) -- prior scaling factors for extended inversion, shape (norm_prefactor, flux_cat) * **b_prior_norm_pref** (*np.ndarray*) -- error covariance matrix of s_prior_norm_pref, shape (norm_prefactor, flux_cat, flux_cat) * *Remarks* * *-------* * *This function mixes the initial prior and the posterior from a* * *previous inversion step. The posterior uncertainty is optionally* * *inflated before mixing with the initial prior. It assumes that* * *the uncertainties of the initial prior and the posterior are* * maximally correlated, see :func:`mix_b`. * *.. versionadded:: 0.1.1* .. py:function:: mix_b(alpha, b1, b2, sqrt_b1 = None) Compute error covariance matrix for alpha * x1 + (1 - alpha) * x2 from error covariance matricess of x1 and x2. Consider two vectors x1, x2 with error covariance matrices B1, B2. Denote the covariance matrix of x1, x2 by B12, i.e., using cumulante notation: B1[i,j] = << x1[i] x1[j] >>, B2[i,j] = << x2[i] x2[j] >>, B12[i,j] = << x1[i] x2[j] >>. Then the error covariance matrix of x1 + x2 is Bsum = B1 + B2 + B12 + B12.T. We assume that B12 = sqrt(B1) sqrt(B2) such that the error covariance matrix of the combined vector (x1, x2) is singular with minimal rank (equal to rank(B1)=rank(B2)). This describes the strongest possible uncertainty correlation between x1 and x2. The output of this function is therefore (denote a=alpha): a**2 * B1 + (1-a)**2 * B2 + a*(1-a) * ( sqrt(B1) sqrt(B2) + sqrt(B2) sqrt(B1) ) .. note:: b1 and b2 must be real-symmetric and positive-semidefinite. .. versionadded:: 0.1.1 .. py:function:: get_localization(lon, lat, height, rtime, hscale, vscale) Construct localization matrix based on coordinates. All input arrays must be one-dimensional and aligned. :param lon: longitude (degrees east) :type lon: np.ndarray :param lat: latitude (degrees east) :type lat: np.ndarray :param height: vertical coordinate in meters :type height: np.ndarray :param rtime: time relative to localization time scale with arbitrary offset :type rtime: np.ndarray :param hscale: horizontal localization scale in meters :type hscale: np.ndarray :param vscale: vertical localization scale in meters :type vscale: np.ndarray :returns: **localization_weights** -- localization matrix specifying weights between each combination of coordiniates :rtype: np.ndarray .. py:function:: get_localization_sparse(lon, lat, height, rtime, hscale, vscale, threshold = 1e-05, chunk = 500) Construct localization matrix based on coordinates, see :func:`get_localization`. .. py:function:: chunk_corr_std(a, b, split) Compute correlation coefficients and standard deviation of chunks of data. result[0] = corr(a[0:split[i]], b[0:split[i]]), result[i] = corr(a[split[i-1]:split[i]], b[split[i-1]:split[i]]) for i > 0 split[-1] == a.size == b.size .. py:function:: fill_chunks(a, split) Fill array chunks: result[split[i-1]:split[i]] = a[i].