# import pandas
# import sasmodels
from pathlib import Path
from typing import Optional
import numpy as np
# import scipy.optimize
import mcsas3.McHDF as McHDF
from .mcmodel import McModel
from .mcopt import McOpt
from .osb import optimizeScalingAndBackground
[docs]class McCore:
"""
The core of the MC procedure.
Parameters
----------
modelFunc:
SasModels function
measData: dict
measurement data dictionary with Q, I, ISigma containing arrays.
For 2D data, Q is a two-element list with [Qx, Qy].
This is why it's not a Pandas Dataframe.
pickParameters: dict
dict of values with new random picks, named by parameter names
modelParameterLimits: dict
dict of value pairs (tuples) with random pick bounds,
named by parameter names
x0:
continually updated new guess for total scaling, background values.
weighting:
volume-weighting / compensation factor for the contributions
nContrib:
number of contributions
"""
_measData = None # measurement data dict with entries for Q, I, ISigma
_model = None # instance of McModel
_opt = None # instance of McOpt
_OSB = None # optimizeScalingAndBackground instance for this data
_outputFilename = None # store output data in here (HDF5)
[docs] def __init__(
self,
measData: dict = None,
model: McModel = None,
opt: McOpt = None,
loadFromFile: Optional[Path] = None,
loadFromRepetition: Optional[int] = None,
resultIndex: int = 1,
):
# make sure we reset state:
self._measData = None
self._model = None
self._opt = None
self._OSB = None
self._outputFilename = None
assert measData is not None, "measurement data must be provided to McCore"
assert isinstance(
measData, dict
), "measurement data must be a dict with (Qx, Qy), I, and Isigma"
self._measData = measData
# make sure we store and read from the right place.
self.resultIndex = McHDF.ResultIndex(resultIndex) # defines the HDF5 root path
if loadFromFile is not None:
self.load(loadFromFile, loadFromRepetition, resultIndex=resultIndex)
testGof, testX0 = self._opt.gof, self._opt.x0
else:
self._model = model
self._opt = opt # McOpt instance
self._opt.step = 0 # number of iteration steps
self._opt.accepted = 0 # number of accepted iterations
self._opt.acceptedSteps = []
self._opt.acceptedGofs = []
self._OSB = optimizeScalingAndBackground(measData["I"], measData["ISigma"])
# set default parameters:
self._model.func.info.parameters.defaults.update(self._model.staticParameters)
# generate kernel
self._model.kernel = self._model.func.make_kernel(self._measData["Q"])
# calculate scattering intensity by combining intensities from all contributions
self.initModelI()
self._opt.gof = (
self.evaluate()
) # calculate initial GOF measure, initial happens when x0 is None
# store the initial background and scaling optimization as new initial guess:
self._opt.x0 = self._opt.testX0
self._opt.acceptedSteps += [0]
self._opt.acceptedGofs += [self._opt.gof]
return # one of the following tests always fails for test data, what's the purpose?
# the hope was to test that the previously stored optimization result still
# gives the same answer for GOF and scaling/background
if loadFromFile is not None:
np.testing.assert_approx_equal(
testGof,
self._opt.gof,
significant=3,
err_msg="goodness-of-fit mismatch between loaded results and new calculation",
)
np.testing.assert_approx_equal(
testX0[0],
self._opt.x0[0],
significant=3,
err_msg="scaling factor mismatch between loaded results and new calculation",
)
np.testing.assert_approx_equal(
testX0[1],
self._opt.x0[1],
significant=3,
err_msg="background mismatch between loaded results and new calculation",
)
[docs] def initModelI(self) -> None:
"""calculate the total intensity from all contributions"""
# set initial shape:
I, V = self._model.calcModelIV(self._model.parameterSet.loc[0].to_dict())
# zero-out all previously stored values for intensity and volume
self._opt.modelI = np.zeros(I.shape)
self._model.volumes = np.zeros(self._model.nContrib)
# add the intensity of every contribution
for contribi in range(self._model.nContrib):
I, V = self._model.calcModelIV(self._model.parameterSet.loc[contribi].to_dict())
# V = self.returnModelV()
# intensity is added, NOT normalized by number of contributions.
# volume normalization is already done in SasModels (!),
# so we have volume-weighted intensities from there...
self._opt.modelI += I # / self._model.nContrib
# we store the volumes anyway since we may want to use them later
# for showing alternatives of number-weighted, or volume-squared weighted histograms
self._model.volumes[contribi] = V
[docs] def evaluate(
self, testData: Optional[dict] = None
) -> (
float
): # , initial: bool = True): # takes 20 ms! initial is taken care of in osb when x0 is None
"""scale and calculate goodness-of-fit (GOF) from all contributions"""
if testData is None:
testData = self._opt.modelI
# this function takes quite a while:
self._opt.testX0, gof = self._OSB.match(testData, self._opt.x0)
return gof
def contribIndex(self) -> int:
return self._opt.step % self._model.nContrib
[docs] def reEvaluate(self) -> float:
"""replace single contribution with new contribution, recalculate intensity and GOF"""
# calculate old intensity to subtract:
Iold, dummy = self._model.calcModelIV(
self._model.parameterSet.loc[self.contribIndex()].to_dict()
)
# calculate new intensity to add:
Ipick, Vpick = self._model.calcModelIV(self._model.pickParameters)
# remove intensity from contribi from modelI
# add intensity from Pick
self._opt.testModelI = self._opt.modelI + (Ipick - Iold)
# store pick volume in temporary location
self._opt.testModelV = Vpick
# recalculate reduced chi-squared for this option
return self.evaluate(self._opt.testModelI)
[docs] def reject(self) -> None:
"""reject pick"""
# nothing to do. Can be used to fish out a running rejection/acceptance ratio later
pass
[docs] def accept(self) -> None:
"""accept pick"""
# store parameters of accepted pick:
self._model.parameterSet.loc[self.contribIndex()] = self._model.pickParameters
# store calculated intensity as new total intensity:
self._opt.modelI = self._opt.testModelI
# store new pick volume to the set of volumes:
self._model.volumes[self.contribIndex()] = self._opt.testModelV
# store latest scaling and background values as new initial guess:
self._opt.x0 = self._opt.testX0
self._opt.acceptedSteps += [self._opt.step] # step at which we accepted
self._opt.acceptedGofs += [self._opt.gof] # gof at which we accepted
# add one to the accepted moves counter:
self._opt.accepted += 1
[docs] def iterate(self) -> None:
"""pick, re-evaluate and accept/reject"""
# pick new model parameters:
self._model.pick() # 3 µs
# calculate GOF for the new total set:
newGof = self.reEvaluate() # 2 ms
# if this is an improvement:
if newGof < self._opt.gof:
# accept the move:
self.accept() # 500 µs
# and store the new GOF as current:
self._opt.gof = newGof
# increment step counter in either case:
self._opt.step += 1
[docs] def optimize(self) -> None:
"""iterate until target GOF or maxiter reached"""
print("Optimization of repetition {} started:".format(self._opt.repetition))
print(
"chiSqr: {}, N accepted: {} / {}".format(
self._opt.gof, self._opt.accepted, self._opt.step
)
)
# continue optimizing until we reach any of these targets:
while (
(self._opt.accepted < self._opt.maxAccept) # max accepted moves
& (self._opt.step < self._opt.maxIter) # max iterations
& (self._opt.gof > self._opt.convCrit) # max number of tries
): # convergence criterion reached
self.iterate()
# show me every 1000 steps where you are in the optimization:
if self._opt.step % 1000 == 1:
print(
"chiSqr: {}, N accepted: {} / {}".format(
self._opt.gof, self._opt.accepted, self._opt.step
)
)
[docs] def store(self, filename: Path) -> None:
"""stores the resulting model parameter-set of a single repetition in the NXcanSAS object,
ready for histogramming"""
self._outputFilename = filename
self._model.store(filename=self._outputFilename, repetition=self._opt.repetition)
self._opt.store(
filename=self._outputFilename,
path=self.resultIndex.nxsEntryPoint
/ "optimization"
/ f"repetition{self._opt.repetition}",
)
[docs] def load(self, loadFromFile: Path, loadFromRepetition: int, resultIndex: int = 1) -> None:
"""loads the configuration and set-up from the extended NXcanSAS file"""
# not implemented yet
assert (
loadFromRepetition is not None
), "When you are loading from a file, a repetition index must be specified"
self._model = McModel(
loadFromFile=loadFromFile,
loadFromRepetition=loadFromRepetition,
resultIndex=resultIndex,
)
self._opt = McOpt(
loadFromFile=loadFromFile,
loadFromRepetition=loadFromRepetition,
resultIndex=resultIndex,
)