Source code for mcsas3.mccore

# import pandas
# import sasmodels
from pathlib import Path
from typing import Optional

import numpy as np

# import scipy.optimize
import mcsas3.McHDF as McHDF

from .mcmodel import McModel
from .mcopt import McOpt
from .osb import optimizeScalingAndBackground


[docs]class McCore: """ The core of the MC procedure. Parameters ---------- modelFunc: SasModels function measData: dict measurement data dictionary with Q, I, ISigma containing arrays. For 2D data, Q is a two-element list with [Qx, Qy]. This is why it's not a Pandas Dataframe. pickParameters: dict dict of values with new random picks, named by parameter names modelParameterLimits: dict dict of value pairs (tuples) with random pick bounds, named by parameter names x0: continually updated new guess for total scaling, background values. weighting: volume-weighting / compensation factor for the contributions nContrib: number of contributions """ _measData = None # measurement data dict with entries for Q, I, ISigma _model = None # instance of McModel _opt = None # instance of McOpt _OSB = None # optimizeScalingAndBackground instance for this data _outputFilename = None # store output data in here (HDF5)
[docs] def __init__( self, measData: dict = None, model: McModel = None, opt: McOpt = None, loadFromFile: Optional[Path] = None, loadFromRepetition: Optional[int] = None, resultIndex: int = 1, ): # make sure we reset state: self._measData = None self._model = None self._opt = None self._OSB = None self._outputFilename = None assert measData is not None, "measurement data must be provided to McCore" assert isinstance( measData, dict ), "measurement data must be a dict with (Qx, Qy), I, and Isigma" self._measData = measData # make sure we store and read from the right place. self.resultIndex = McHDF.ResultIndex(resultIndex) # defines the HDF5 root path if loadFromFile is not None: self.load(loadFromFile, loadFromRepetition, resultIndex=resultIndex) testGof, testX0 = self._opt.gof, self._opt.x0 else: self._model = model self._opt = opt # McOpt instance self._opt.step = 0 # number of iteration steps self._opt.accepted = 0 # number of accepted iterations self._opt.acceptedSteps = [] self._opt.acceptedGofs = [] self._OSB = optimizeScalingAndBackground(measData["I"], measData["ISigma"]) # set default parameters: self._model.func.info.parameters.defaults.update(self._model.staticParameters) # generate kernel self._model.kernel = self._model.func.make_kernel(self._measData["Q"]) # calculate scattering intensity by combining intensities from all contributions self.initModelI() self._opt.gof = ( self.evaluate() ) # calculate initial GOF measure, initial happens when x0 is None # store the initial background and scaling optimization as new initial guess: self._opt.x0 = self._opt.testX0 self._opt.acceptedSteps += [0] self._opt.acceptedGofs += [self._opt.gof] return # one of the following tests always fails for test data, what's the purpose? # the hope was to test that the previously stored optimization result still # gives the same answer for GOF and scaling/background if loadFromFile is not None: np.testing.assert_approx_equal( testGof, self._opt.gof, significant=3, err_msg="goodness-of-fit mismatch between loaded results and new calculation", ) np.testing.assert_approx_equal( testX0[0], self._opt.x0[0], significant=3, err_msg="scaling factor mismatch between loaded results and new calculation", ) np.testing.assert_approx_equal( testX0[1], self._opt.x0[1], significant=3, err_msg="background mismatch between loaded results and new calculation", )
[docs] def initModelI(self) -> None: """calculate the total intensity from all contributions""" # set initial shape: I, V = self._model.calcModelIV(self._model.parameterSet.loc[0].to_dict()) # zero-out all previously stored values for intensity and volume self._opt.modelI = np.zeros(I.shape) self._model.volumes = np.zeros(self._model.nContrib) # add the intensity of every contribution for contribi in range(self._model.nContrib): I, V = self._model.calcModelIV(self._model.parameterSet.loc[contribi].to_dict()) # V = self.returnModelV() # intensity is added, NOT normalized by number of contributions. # volume normalization is already done in SasModels (!), # so we have volume-weighted intensities from there... self._opt.modelI += I # / self._model.nContrib # we store the volumes anyway since we may want to use them later # for showing alternatives of number-weighted, or volume-squared weighted histograms self._model.volumes[contribi] = V
[docs] def evaluate( self, testData: Optional[dict] = None ) -> ( float ): # , initial: bool = True): # takes 20 ms! initial is taken care of in osb when x0 is None """scale and calculate goodness-of-fit (GOF) from all contributions""" if testData is None: testData = self._opt.modelI # this function takes quite a while: self._opt.testX0, gof = self._OSB.match(testData, self._opt.x0) return gof
def contribIndex(self) -> int: return self._opt.step % self._model.nContrib
[docs] def reEvaluate(self) -> float: """replace single contribution with new contribution, recalculate intensity and GOF""" # calculate old intensity to subtract: Iold, dummy = self._model.calcModelIV( self._model.parameterSet.loc[self.contribIndex()].to_dict() ) # calculate new intensity to add: Ipick, Vpick = self._model.calcModelIV(self._model.pickParameters) # remove intensity from contribi from modelI # add intensity from Pick self._opt.testModelI = self._opt.modelI + (Ipick - Iold) # store pick volume in temporary location self._opt.testModelV = Vpick # recalculate reduced chi-squared for this option return self.evaluate(self._opt.testModelI)
[docs] def reject(self) -> None: """reject pick""" # nothing to do. Can be used to fish out a running rejection/acceptance ratio later pass
[docs] def accept(self) -> None: """accept pick""" # store parameters of accepted pick: self._model.parameterSet.loc[self.contribIndex()] = self._model.pickParameters # store calculated intensity as new total intensity: self._opt.modelI = self._opt.testModelI # store new pick volume to the set of volumes: self._model.volumes[self.contribIndex()] = self._opt.testModelV # store latest scaling and background values as new initial guess: self._opt.x0 = self._opt.testX0 self._opt.acceptedSteps += [self._opt.step] # step at which we accepted self._opt.acceptedGofs += [self._opt.gof] # gof at which we accepted # add one to the accepted moves counter: self._opt.accepted += 1
[docs] def iterate(self) -> None: """pick, re-evaluate and accept/reject""" # pick new model parameters: self._model.pick() # 3 µs # calculate GOF for the new total set: newGof = self.reEvaluate() # 2 ms # if this is an improvement: if newGof < self._opt.gof: # accept the move: self.accept() # 500 µs # and store the new GOF as current: self._opt.gof = newGof # increment step counter in either case: self._opt.step += 1
[docs] def optimize(self) -> None: """iterate until target GOF or maxiter reached""" print("Optimization of repetition {} started:".format(self._opt.repetition)) print( "chiSqr: {}, N accepted: {} / {}".format( self._opt.gof, self._opt.accepted, self._opt.step ) ) # continue optimizing until we reach any of these targets: while ( (self._opt.accepted < self._opt.maxAccept) # max accepted moves & (self._opt.step < self._opt.maxIter) # max iterations & (self._opt.gof > self._opt.convCrit) # max number of tries ): # convergence criterion reached self.iterate() # show me every 1000 steps where you are in the optimization: if self._opt.step % 1000 == 1: print( "chiSqr: {}, N accepted: {} / {}".format( self._opt.gof, self._opt.accepted, self._opt.step ) )
[docs] def store(self, filename: Path) -> None: """stores the resulting model parameter-set of a single repetition in the NXcanSAS object, ready for histogramming""" self._outputFilename = filename self._model.store(filename=self._outputFilename, repetition=self._opt.repetition) self._opt.store( filename=self._outputFilename, path=self.resultIndex.nxsEntryPoint / "optimization" / f"repetition{self._opt.repetition}", )
[docs] def load(self, loadFromFile: Path, loadFromRepetition: int, resultIndex: int = 1) -> None: """loads the configuration and set-up from the extended NXcanSAS file""" # not implemented yet assert ( loadFromRepetition is not None ), "When you are loading from a file, a repetition index must be specified" self._model = McModel( loadFromFile=loadFromFile, loadFromRepetition=loadFromRepetition, resultIndex=resultIndex, ) self._opt = McOpt( loadFromFile=loadFromFile, loadFromRepetition=loadFromRepetition, resultIndex=resultIndex, )