"""Signal processing."""
import warnings
from astropy import convolution as ap_conv
import numpy as np
from obspy.signal.cross_correlation import correlate, xcorr_max
import scipy as sp
from scipy.fft import fft, next_fast_len, rfft, rfftfreq
from scipy.optimize import curve_fit
from scipy.interpolate import CubicSpline
import scipy.ndimage
from scipy.signal import hilbert
#
# Coefficients for (central) finite difference computation
# https://www.wikiwand.com/en/Finite_difference_coefficient
# First derivatives
FDC_X_1 = {
2: np.array([[1, 0, -1]]) / 2,
4: np.array([[-1, 8, 0, -8, 1]]) / 12,
6: np.array([[1, -9, 45, 0, -45, 9, -1]]) / 60,
}
FDC_Y_1 = {order: coef.T for order, coef in FDC_X_1.items()}
# Second derivatives
FDC_X_2 = {
2: np.array([[1, -2, 1]]),
4: np.array([[-1, 16, -30, 16, -1]]) / 12,
6: np.array([[2, -27, 270, -490, 270, -27, 2]]) / 180,
}
FDC_Y_2 = {order: coef.T for order, coef in FDC_X_2.items()}
def _normal(x, *p):
"""
Definition of Gauss function to fit.
"""
A, mu, sigma = p
pdf = A * np.exp(-(x-mu)**2 / (2*sigma**2))
return pdf
def _skew_normal(x, *p):
"""
Definition of skewed normal distribution.
https://www.wikiwand.com/en/Skew_normal_distribution
"""
A, mu, sigma, skew = p
norm = np.exp(-(x-mu)**2 / (2*sigma**2))
erf = 1 + sp.special.erf(skew*(x-mu)/sigma/np.sqrt(2))
pdf = A * norm * erf
return pdf
def _laplace(x, *p):
"""
Laplace distribution.
"""
A, mu, sigma = p
pdf = A * np.exp(-np.abs(x-mu) / sigma)
return pdf
def _asymmetric_laplace(x, *p):
"""
Asymmetric Laplace distribution.
https://www.wikiwand.com/en/Asymmetric_Laplace_distribution
"""
A, mu, sigma, skew = p
s = np.sign(x - mu)
pdf = A * np.exp(- np.abs(x-mu) / sigma * skew**s)
return pdf
[docs]def fit_dist(y, dist='normal', **kwargs):
"""
Fit histogram to a distribution.
"""
hist, bin_edges = np.histogram(y, **kwargs)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
dist = dist.lower()
if dist in ['normal', 'gauss', 'gaussian']:
p0 = [np.max(hist), np.mean(y), np.std(y)]
func = _normal
elif dist in ['skew_normal']:
p0 = [np.max(hist), np.mean(y), np.std(y), sp.stats.skew(y)]
func = _skew_normal
elif dist in ['laplace']:
p0 = [np.max(hist), np.mean(y), np.std(y)]
func = _laplace
elif dist in ['asymmetric_laplace', 'ald']:
p0 = [np.max(hist), np.mean(y), np.std(y), sp.stats.skew(y)]
func = _asymmetric_laplace
popt, pcov = curve_fit(func, bin_centers, hist, p0=p0)
popt[2] = abs(popt[2])
hist_fit = func(bin_centers, *popt)
return popt, bin_centers, hist, hist_fit
[docs]def normalized(a, norm=None):
"""
Normalize 1D array to its absolute maximum.
https://docs.obspy.org/_modules/obspy/core/trace.html#Trace.normalize
"""
# normalize, use norm-kwarg otherwise normalize to 1
if norm is not None:
norm = norm
if norm < 0:
msg = "Normalizing with negative values is forbidden. " + \
"Using absolute value."
warnings.warn(msg)
else:
norm = np.abs(a).max()
# Don't do anything for zero norm but raise a warning.
if not norm:
msg = ("Attempting to normalize by dividing through zero. This "
"is not allowed and the data will thus not be changed.")
warnings.warn(msg)
return a
# Convert data if it's not a floating point type.
if not np.issubdtype(a.dtype, float):
a = np.require(a, dtype=np.float64)
return a / abs(norm)
[docs]def rms(a):
"""
Root mean square.
"""
if (a.size == 0) or np.all(a == 0):
warnings.warn('Empty array.')
return None
else:
return np.sqrt((a**2).sum() / a.size)
[docs]def corr_coef(reference, current, shift=None, demean=True, abs_max=True,
domain='freq'):
"""
Return shift and value of maximum of cross-correlation.
"""
if shift == 0:
domain = 'time'
fct = correlate(reference, current, shift=shift,
demean=demean, normalize=True, domain=domain)
return xcorr_max(fct, abs_max=abs_max)
[docs]def ddot(x, y, interp=True):
"""
Return 2nd order derivative.
"""
if interp:
iinc = np.argsort(x)
_x = x[iinc]
_y = y[iinc]
fy = CubicSpline(_x, _y)
yddot = fy.derivative(2)(x)
else:
ydot = np.gradient(y, x)
yddot = np.gradient(ydot, x)
return yddot
[docs]def inst_ph(a):
"""
Return instantaneous phase.
"""
return np.unwrap(np.angle(hilbert(a)))
[docs]def corr2(a, b):
"""
2-D correlation coefficient.
https://www.mathworks.com/help/images/ref/corr2.html#f1-227958
"""
a_mean = a.mean()
b_mean = b.mean()
a_diff = a - a_mean
b_diff = b - b_mean
numerator = np.nansum(a_diff * b_diff)
denominator = np.sqrt(np.nansum(a_diff**2) * np.nansum(b_diff**2))
return numerator / denominator
[docs]def ma_corr2(a, b):
"""
Similar to :meth:`corr2()` but for masked array.
"""
mask = a.mask + b.mask
a_mean = a.mean()
b_mean = b.mean()
a_diff = a - a_mean
a_diff.mask += mask
b_diff = b - b_mean
b_diff.mask += mask
numerator = np.nansum(a_diff * b_diff)
denominator = np.sqrt(np.nansum(a_diff**2) * np.nansum(b_diff**2))
return numerator / denominator
[docs]def nextpow2(n):
"""
Exponent of next higher power of 2.
https://www.mathworks.com/help/matlab/ref/nextpow2.html
"""
return np.ceil(np.log2(n))
[docs]def intersect2d(a, b, assume_unique=False, return_indices=False):
"""
Find the intersection of two 2-D arrays.
https://stackoverflow.com/a/8317403/8877268
:param assume_unique:
:param return_indices:
"""
av = a.view([('', a.dtype)]*a.shape[1]).ravel()
bv = b.view([('', b.dtype)]*b.shape[1]).ravel()
res = np.intersect1d(
av, bv,
assume_unique=assume_unique,
return_indices=return_indices,
)
if return_indices:
int2d = res[0].view(a.dtype).reshape(-1, a.shape[1])
return int2d, res[1], res[2]
else:
int2d = res.view(a.dtype).reshape(-1, a.shape[1])
return int2d
[docs]def spectrum(a, delta=1, **kwargs):
"""
Return frequencies, amplitude and phase of FFT.
"""
nfreq = kwargs.get('nfreq')
if nfreq is None:
nfreq = next_fast_len(a.size)
spectrum = rfft(a, nfreq)
freqs = rfftfreq(n=nfreq, d=delta)
am = np.abs(spectrum)
ph = np.angle(spectrum)
return freqs, am, ph
[docs]def nsig(x, n=2):
"""
Round to n signifcant figures.
https://stackoverflow.com/a/3411435/8877268
"""
return round(x, -int(np.floor(np.log10(x))) + (n - 1))
[docs]def linrange(start, stop, step=1):
"""
Mix of arange & linspace.
"""
if (stop-start) % step != 0:
raise ValueError('Can not include end')
num = int(abs(stop-start)/step + 1)
return np.linspace(start, stop, num)
[docs]def conv_spc(in1, in2):
"""
Return spectra of convolution (spectral multiplication)
of two 1-D signals.
https://github.com/scipy/scipy/blob/v1.6.0/scipy/signal/signaltools.py#L551-L663
"""
in1 = np.asarray(in1)
in2 = np.asarray(in2)
if in1.ndim == in2.ndim == 0: # scalar inputs
return in1 * in2
elif in1.ndim != in2.ndim:
raise ValueError("in1 and in2 should have the same dimensionality")
elif in1.size == 0 or in2.size == 0: # empty arrays
return np.array([])
# Speed up FFT by padding to optimal size
fshape = next_fast_len(in1.size+in2.size-1)
sp1 = fft(in1, fshape)
sp2 = fft(in2, fshape)
return sp1 * sp2
[docs]def norm_weight(w, axis=0):
"""
Normalize weights.
"""
w_sum = np.nansum(w, axis=axis)
w_sum_all = np.broadcast_to(w_sum, w.shape)
has_weight = (w_sum_all != 0)
w[has_weight] = w[has_weight] / w_sum_all[has_weight]
return w
[docs]def weighted_mean(x, w, axis=0):
"""
https://www.wikiwand.com/en/Weighted_arithmetic_mean#/Mathematical_definition
"""
w_sum = np.nansum(w, axis=axis)
has_weight = (w_sum != 0)
mean = np.zeros(w_sum.shape)
mean[has_weight] = np.nansum(x*w, axis=axis)[has_weight] / w_sum[has_weight]
return mean, w_sum, has_weight
[docs]def weighted_sd(x, w, axis=0):
"""
https://www.wikiwand.com/en/Weighted_arithmetic_mean#/Weighted_sample_variance
"""
mean, w_sum, has_weight = weighted_mean(x, w, axis)
mean_bc = np.broadcast_to(mean, x.shape)
numerator = np.nansum(w * (x-mean_bc)**2, axis=axis)[has_weight]
denomitor = w_sum[has_weight]
sd = np.zeros(mean.shape)
sd[has_weight] = np.sqrt(numerator / denomitor)
return sd, mean
[docs]def weighted_sem(x, w, axis=0):
"""
No n/(n-1) factor.
https://www.wikiwand.com/en/Weighted_arithmetic_mean#/Bootstrapping_validation
"""
mean, w_sum, has_weight = weighted_mean(x, w, axis)
mean_bc = np.broadcast_to(mean, x.shape)
numerator = np.nansum(w**2 * (x-mean_bc)**2, axis=axis)[has_weight]
denomitor = w_sum[has_weight]**2
sem = np.zeros(mean.shape)
sem[has_weight] = np.sqrt(numerator / denomitor)
return sem, mean
[docs]def gaussian_filter(a, x_stddev, y_stddev=None, **kwargs):
"""
http://docs.astropy.org/en/stable/convolution/index.html
"""
kwargs_def = {
'fill_value': np.nan,
'preserve_nan': True,
# 'boundary': 'extend',
# 'nan_treatment': 'interpolate',
}
a2 = ap_conv.convolve(
a,
kernel=ap_conv.Gaussian2DKernel(x_stddev, y_stddev),
**{**kwargs_def, **kwargs}
)
return a2
[docs]def gradient_2d(z, dx, dy, order=2):
"""
Not use Numpy because np.gradient dx or dy must be 1D.
https://github.com/numpy/numpy/issues/9401
https://github.com/Unidata/MetPy/issues/174
"""
coef_x = FDC_X_1[order]
coef_y = FDC_Y_1[order]
zx = sp.ndimage.convolve(z, coef_x) / dx
zy = sp.ndimage.convolve(z, coef_y) / dy
return zx, zy
def _lplc_green(zx, zy, dx, dy):
"""
Green's first identity with psi=constant
https://www.wikiwand.com/en/Green%27s_identities#/Green's_first_identity
"""
zxp = zx[1:-1, 2:]
zxn = zx[1:-1, :-2]
# Positive y direction is opposite to latitude
zyn = zy[2:, 1:-1]
zyp = zy[:-2, 1:-1]
loopsum = (zxp-zxn) * 2*dy - (zyp-zyn) * 2*dx
area = 2*dy * 2*dx
lplc = loopsum / area
return lplc
def _lplc_fd_raw(z, dx, dy, order):
coef_x = FDC_X_2[order]
coef_y = FDC_Y_2[order]
z_xx = sp.ndimage.convolve(z, coef_x) / dx**2
z_yy = sp.ndimage.convolve(z, coef_y) / dy**2
lplc = z_xx + z_yy
return lplc
def _lplc_fd_gauss(z, dx, dy, sigma, mode='nearest', cval=0):
"""
"""
# Gaussian
# https://github.com/scipy/scipy/blob/adc4f4f7bab120ccfab9383aba272954a0a12fb0/scipy/ndimage/filters.py#L452-L497
kw = {'sigma': sigma, 'mode': mode, 'cval': cval}
# lplc = sp.ndimage.gaussian_laplace(z, **kw) / dy**2
z_xx = sp.ndimage.gaussian_filter(z, order=[0, 2], **kw) / dx**2
z_yy = sp.ndimage.gaussian_filter(z, order=[2, 0], **kw) / dy**2
lplc = z_xx + z_yy
return lplc
def laplacian_2d(dx, dy, z=None, zx=None, zy=None, method='Green', **kwargs):
method = method.lower()
if method in ['green']:
lplc = _lplc_green(zx, zy, dx, dy)
elif method in ['fd', 'finite_difference']:
lplc = _lplc_fd_gauss(z, dx, dy, **kwargs)
else:
raise ValueError(f'Unknow method {method}')
return lplc