# -*- coding: utf-8 -*-
# distrib.py
import itertools
from collections.abc import Iterable
from operator import itemgetter
import matplotlib.font_manager as font_manager
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.integrate
import scipy.interpolate
from .plotting import plotVertBar
from .utils import grouper
[docs]
def integrate(xvec, yvec):
return abs(scipy.integrate.simps(yvec, x=xvec))
[docs]
def normalizeDistrib(x, y, u=None):
x = x.values if isinstance(x, pd.Series) else x
y = y.values if isinstance(y, pd.Series) else y
# normalize the distribution to area of 1
norm = integrate(x, y)
# print("CONTINs norm", norm)
y /= norm
if u is not None:
u /= norm
return x, y, u
[docs]
def area(xvec, yvec, showArea=True):
"""Returns a string with the area value of the given discrete curve points."""
return r" $\int${:.3g}".format(integrate(xvec, yvec)) if showArea else ""
[docs]
def findPeakRanges(x, y, tol=1e-16):
"""Returns the location of data/peak above a base line.
Assumes it touches the baseline before and after. For distributions.
*tol*: Multiplied by Y to produce a threshold to distinguish noise/artifacts from peaks."""
x = x.values if isinstance(x, pd.Series) else x
y = y.values if isinstance(y, pd.Series) else y
# look at all data above zero, get their array indices
indices = np.where(y > tol * y.max())[0]
# segmentation: look where continous groups of indices start and end
indexGroups = np.where(np.diff(indices) > 1)[0]
ranges = []
istart = indices[0]
def appendPeakRange(start, end):
# print("appending", start, end, end-start)
start, end = max(start - 1, 0), min(end + 1, len(x) - 1)
monotony = np.sign(np.diff(y[start : end + 1]))
if not all(monotony == monotony[0]):
# avoid monotonously increasing/decreasing peaks -> unwanted artefacts
ranges.append((start, end))
for idx in indexGroups:
appendPeakRange(istart, indices[idx]) # add the new range to the list
istart = indices[idx + 1] # start new range
appendPeakRange(istart, indices[-1])
# print("findPeakRanges", ranges)
return ranges
[docs]
def findLocalMinima(peakRanges, xarr, yarr, doPlot=False, verbose=False):
"""Identify local (non-zero) minima within given peak ranges and separate those
bimodal ranges into monomodal ranges, thus splitting up the peak range if it contains
maxima connected by non-zero minima. Returns a list of index tuples indicating the
start and end of each peak. Uses 4th order spline fitting and its derivative
for finding positions of local minima."""
# print("findLocalMinima", peakRanges)
newRanges = []
if doPlot:
plt.figure(figsize=(15, 5))
for ip, (istart, iend) in enumerate(peakRanges):
if verbose:
print((istart, iend), xarr[istart], xarr[iend])
if iend - istart < 5: # skip this, can't be fitted and no sub-peaks are likely
newRanges.append((istart, iend))
continue
while yarr[istart] <= 0.0 and istart < iend:
istart += 1 # exclude leading zero
while yarr[iend] <= 0.0 and istart < iend:
iend -= 1 # exclude trailing zero
if istart == iend:
continue
if verbose:
print((istart, iend))
x, y = xarr[istart : iend + 1], yarr[istart : iend + 1]
try:
spline = scipy.interpolate.InterpolatedUnivariateSpline(x, y, k=4)
except Exception:
print(f"Warning: Could not findLocalMinima() within {(istart, iend)}!")
newRanges.append((istart, iend))
continue
# if verbose: print(spline(x))
deriv = spline.derivative()
# if verbose: print(deriv(x))
roots = deriv.roots()
# get indices of roots and ignore any duplicate indices
rootIdx = set(np.argmin(np.abs(xarr[:, np.newaxis] - roots[np.newaxis, :]), axis=0))
rootIdx.add(istart)
rootIdx.add(iend)
rootIdx = sorted(rootIdx)
# if rootIdx[0] == istart: # omit the first root at the beginning
# rootIdx = rootIdx[1:]
if verbose:
print((istart, iend), len(roots), roots, rootIdx)
if doPlot:
plt.subplot(1, len(peakRanges), ip + 1)
radGrid = np.linspace(x[0], x[-1], 200)
plt.plot(x, y, label="data")
plt.plot(radGrid, spline(radGrid), label="spline"),
plt.ylabel("data & spline approx.")
handles1, labels1 = plt.gca().get_legend_handles_labels()
[
plotVertBar(plt, xarr[i], spline(radGrid).max(), color="blue", ls=":")
for i in rootIdx
]
plt.gca().twinx()
plt.plot(radGrid, deriv(radGrid), label="deriv. spline", color="green")
plt.ylabel("1st derivative")
handles2, labels2 = plt.gca().get_legend_handles_labels()
plt.grid()
plt.legend(handles1 + handles2, labels1 + labels2)
peakBoundaries = rootIdx[::2]
if verbose:
print(peakBoundaries)
newRanges += [tuple(peakBoundaries[i : i + 2]) for i in range(len(peakBoundaries) - 1)]
if verbose:
print(newRanges)
return newRanges
[docs]
def getLargestPeaks(peakRanges, xarr, yarr, count=1):
def peakRangeArea(peakRange):
return integrate(
xarr[peakRange[0] : peakRange[1] + 1], yarr[peakRange[0] : peakRange[1] + 1]
)
return sorted(peakRanges, key=peakRangeArea, reverse=True)[:count]
[docs]
class Moments(dict):
[docs]
@staticmethod
def nthMoment(x, weights, n):
"""Calculates the nth moment of the given distribution weights."""
center = 0
if n > 0: # calculate the mean first
center = np.average(x, weights=weights) if sum(weights) else 0.0
# np.sqrt(u**2)/len(u) # center uncertainty
if n == 1:
return center # the mean
var = 1.0
if n > 1:
var = np.sum(weights * (x - center) ** 2) / np.sum(weights)
if n == 2:
return var # the variance
return np.sum(weights * (x - center) ** n) / np.sum(weights) / var**n
@classmethod
def fromData(cls, x, y):
store = cls()
mean, var, skew, kurt = [cls.nthMoment(x, y, i) for i in range(1, 5)]
store["area"] = integrate(x, y)
store["mean"] = mean
store["var"] = var
store["skew"] = skew
store["kurt"] = kurt
return store
@property
def area(self):
return self["area"]
@property
def mean(self):
return self["mean"]
@property
def var(self):
return self["var"]
@property
def skew(self):
return self["skew"]
@property
def kurt(self):
return self["kurt"]
def __str__(self):
return "\n".join(
[
"{: <4s}: {: 9.2g}".format(k, self[k])
for k in ("area", "mean", "var", "skew", "kurt")
]
)
@staticmethod
def logNormParFromMoments(mean, var, N=1.0):
# SASfit manual, 6.4. Log-Normal distribution
median = mean**2 / np.sqrt(var + mean**2)
sigma = np.sqrt(np.log(mean**2 / median**2))
return {"N": N, "sigma": sigma, "median": median}
def logNormPar(self, N=1.0):
return self.logNormParFromMoments(self.mean, self.var, N=N)
[docs]
class Distribution:
x, y, u = None, None, None
peaks = None # list of peak (start, end) indices pointing into x,y,u
color = None
xlabel = None
plotAxes, plotAxisIdx = None, 0
[docs]
def __init__(self, xvec, yvec, uvec, xlabel=None, maxPeakCount=None):
self.xlabel = getattr(xvec, "name", None)
xvec = xvec.values if isinstance(xvec, pd.Series) else xvec
yvec = yvec.values if isinstance(yvec, pd.Series) else yvec
uvec = uvec.values if isinstance(uvec, pd.Series) else uvec
self.x, self.y, self.u = normalizeDistrib(xvec, yvec, uvec)
if xlabel is not None:
self.xlabel = xlabel
self.peaks = findPeakRanges(self.x, self.y, tol=1e-6)
# refine the peak ranges containing multiple maxima
self.peaks = findLocalMinima(self.peaks, self.x, self.y)
# For a given list of peaks (by start/end indices) return only those
# whose ratio of amount to uncertainty ratio is always below the given max. ratio
# maxRatio = 1.5
# self.peakRanges = [(istart, iend) for istart, iend in self.peakRanges
# if maxRatio > 1/np.median(self.y[istart:iend+1]/self.u[istart:iend+1])]
# Sort the peaks by area and use the largest (last) only, assuming monomodal distributions
if maxPeakCount:
self.peaks = getLargestPeaks(self.peaks, self.x, self.y, count=maxPeakCount)
def peakData(self, peakRange):
return (
self.x[peakRange[0] : peakRange[1] + 1],
self.y[peakRange[0] : peakRange[1] + 1],
self.u[peakRange[0] : peakRange[1] + 1],
)
def uncertRatioMedian(self, peakRange):
_, y, u = self.peakData(peakRange)
return 1.0 / np.median(y / u)
@staticmethod
def getBarWidth(xvec):
return np.concatenate((np.diff(xvec)[:1], np.diff(xvec)))
[docs]
def plotPeak(self, peakRange, mom, momLo, momHi, dp, dpLo, dpHi, showFullRange=False, ax=None):
"""
*showFullRange*: Set the x range to cover the whole distribution instead of the peak only.
"""
x, y, u = self.peakData(peakRange)
if not ax:
ax = plt.gca()
# ax.plot(x, y, 'o', color=cls.color)
lbl, fmt = [], "{: <7s} {: 9.2g} ±{: 9.2g}"
for k in "area", "median", "var", "skew", "kurt":
if k == "median":
lbl.append(
fmt.format(
"median:",
dp["median"],
max(abs(dp["median"] - dpLo["median"]), abs(dpHi["median"] - dp["median"])),
)
)
else:
lbl.append(
fmt.format(k + ":", mom[k], max(abs(mom[k] - momLo[k]), abs(momHi[k] - mom[k])))
)
lbl.append("LogNorm: " + distrParToText(dp)[0])
ax.bar(x, y, width=self.getBarWidth(x), color=self.color, alpha=0.5, label="\n".join(lbl))
ax.fill_between(
x,
np.maximum(0, y - u),
y + u,
color="red",
lw=0,
alpha=0.1,
label=f"uncertainties (lvl: {self.uncertRatioMedian(peakRange):.3g})",
)
if showFullRange:
ax.set_xlim((self.x.min(), self.x.max()))
ax.set_xlabel(self.xlabel)
ax.grid(True)
legend = ax.legend(prop=font_manager.FontProperties(family="monospace"))
# make the legend background more transparent
legend.get_frame().set_alpha(None)
legend.get_frame().set_facecolor((1, 1, 1, 0.2))
[docs]
def plot(self, ax, distPar, name=""):
"""plot complete distribution as loaded from file"""
lbl = (
"from file, "
+ name
+ area(self.x, self.y, showArea=True)
+ "\n"
+ distrParLatex(distPar[0])
)
ax.fill_between(
self.x,
self.y,
# width=GenericResult.getBarWidth(self.x),
color=self.color,
alpha=0.5,
label=lbl,
)
# ax.errorbar(self.x, self.y, yerr=self.u, lw=lineWidth()*2, label=lbl)
ax.fill_between(
self.x,
np.maximum(0, self.y - self.u),
self.y + self.u,
color="red",
lw=0,
alpha=0.1,
label="uncertainties",
)
ax.set_xlabel(self.xlabel)
ax.legend()
ax.grid()
ax.set_xscale("log")
def moments(self):
def momentsByPeak():
for peakRange in self.peaks:
x, y, u = self.peakData(peakRange)
N = integrate(x, y)
mom = Moments.fromData(x, y)
momLo = Moments.fromData(x, np.maximum(0, y - u))
momHi = Moments.fromData(x, y + u)
lnp = mom.logNormPar(N=N)
lnpLo = momLo.logNormPar(N=N)
lnpHi = momHi.logNormPar(N=N)
yield (peakRange, mom, momLo, momHi, lnp, lnpLo, lnpHi)
# return a dict of lists, addressable by peak index
return dict(
zip(
["peakRange", "mom", "momLo", "momHi", "lnp", "lnpLo", "lnpHi"],
zip(*[m for m in momentsByPeak()]),
)
)
def peakDistrPar(self, plotAxes=None, plotAxisStart=0, **plotPeakKwargs):
momentsAndLogNormPar = self.moments()
if plotAxes is not None:
for i, peakRange in enumerate(momentsAndLogNormPar["peakRange"]):
plotPeakKwargs["ax"] = plotAxes[plotAxisStart + i]
self.plotPeak(*[v[i] for v in momentsAndLogNormPar.values()], **plotPeakKwargs)
return momentsAndLogNormPar["lnp"], momentsAndLogNormPar["mom"]
[docs]
def distrParToText(logNormPar):
"""
>>> distrParToText({'N':1.1, 'sigma':0.15, 'median':33.1234e-9})
['median=3.3e-08 sigma=0.15 N=1.1']
>>> distrParToText({'N':1.2e13, 'sigma':0.15, 'median':3.1234})
['median=3.1 sigma=0.15 N=1.2e+13']
>>> distrParToText({'N':(1.,2.), 'sigma':(.2,.4), 'median':(40e-9,7e-8)})
['median_0=4e-08 sigma_0=0.20 N_0=1', 'median_1=7e-08 sigma_1=0.40 N_1=2']
"""
fmt = {"median": "{:.2g}", "sigma": "{:.2f}", "N": "{:.2g}"}
order = {key: list(fmt.keys()).index(key) for key in fmt.keys()}
return [
" ".join(p)
for p in grouper(
list(
zip(
*sorted(
[
(
i * 10 + order[key],
f"{key}"
+ (f"_{i}" if isinstance(vals, Iterable) else "")
+ "="
+ fmt[key].format(v),
)
for key, vals in logNormPar.items()
for i, v in enumerate(vals if isinstance(vals, Iterable) else [vals])
],
key=itemgetter(0),
)
)
)[-1],
3,
)
]
[docs]
def distrParLatex(distrPar, *kwargs):
r"""
>>> distrParLatex({'N':1.1, 'sigma':0.15, 'median':33e-9})
'$median=3.3e-08\\;sigma=0.15\\;N=1.1$'
>>> distrParLatex({'N':(1.,2.), 'sigma':(.2,.4), 'median':(40e-9,7e-8)})
'$median_0=4e-08\\;sigma_0=0.20\\;N_0=1$\n$median_1=7e-08\\;sigma_1=0.40\\;N_1=2$'
"""
return "\n".join(["$" + txt.replace(" ", r"\;") + "$" for txt in distrParToText(distrPar)])
[docs]
def distrParToFilename(distrPar, prefix=""):
"""
>>> distrParToFilename({'N':1.1, 'sigma':0.15, 'median':33e-9})
'_median=3.3e-08_sigma=0.15_N=1.1'
>>> distrParToFilename({'N':(1.,2.), 'sigma':(.2,.4), 'median':(40e-9,7e-8)})
'_median_0=4e-08_sigma_0=0.20_N_0=1_median_1=7e-08_sigma_1=0.40_N_1=2'
"""
return "_".join([prefix] + distrParToText(distrPar)).replace(" ", "_")
[docs]
def distrParFromFilename(fn):
"""
>>> distrParFromFilename('_median=_33_sigma=0.15_N=1.1') == {'N':1.1, 'sigma':0.15, 'median':33}
True
>>> fn = '_median_0=4e-08_sigma_0=0.20_N_0=1_median_1=7e-08_sigma_1=0.40_N_1=2'
>>> distrParFromFilename(fn) == {'N':(1.,2.), 'sigma':(.2,.4), 'median':(40e-9,7e-8)}
True
"""
fn = fn.split("=")
fn = [elem.lstrip("_") for elem in fn]
fn = [(elem.split("_", maxsplit=1) if elem[0].isnumeric() else [elem]) for elem in fn]
fn = list(itertools.chain(*fn))
result = {}
for k, v in grouper(fn, 2):
key = k.split("_")[0]
value = float(v) if "median" == key else float(v)
result[key] = (
value
if key not in result
else (
result[key] + (value,) if isinstance(result[key], tuple) else (result[key], value)
)
)
return result