from pathlib import Path
from typing import List, Optional, Tuple
import numpy as np
import pandas
import sasmodels
import sasmodels.core
import sasmodels.direct_model
from scipy import interpolate
import mcsas3.McHDF as McHDF
# TODO: perhaps better defined as a dataclass with attrs
[docs]class sphereParameters(object):
# micro-class to mimick the nested structure of SasModels in simulation model:
defaults = {
"scale": 1.0,
"background": 0.0,
"sld": 1.0e-6,
"sld_solvent": 0,
"radius": 1,
}
[docs] def __init__(self) -> None:
pass
# ibid.
[docs]class sphereInfo(object):
# micro-class to mimick the nested structure of SasModels in simulation model:
parameters = sphereParameters()
[docs] def __init__(self) -> None:
pass
[docs]class mcsasSphereModel(object):
"""pretends to be a sasmodel, but just for a sphere - in case sasmodels give gcc errors"""
sld = None
sld_solvent = None
radius = None
# scale = None
# background = None
settables = ["sld", "sld_solvent", "radius", "scale", "background"]
measQ = None # needs to be set later when initializing
info = sphereInfo()
[docs] def __init__(self, **kwargs: dict) -> None:
# reset values to make sure we're not inheriting anything from another instance:
self.sld = 1 # input SLD in units of 1e-6 1/A^2.
self.sld_solvent = 0
self.radius = [] # first element of two-eleemnt Q list
# self.scale = None # second element of two-element Q list
# self.background = [] # intensity of simulated data
self.measQ = None # needs to be set later when initializing
self.info = sphereInfo()
# overwrites settings loaded from file if specified.
for key, value in kwargs.items():
assert (
key in self.settables
), "Key '{}' is not a valid settable option. Valid options are: \n {}".format(
key, self.settables
)
setattr(self, key, value)
# assert all([key in kwargs.keys()
# for key in ['simDataQ0', 'simDataQ1', 'simDataI', 'simDataISigma']]),
# 'The following input arguments must be provided to describe the simulation data:'
# 'simDataQ0, simDataQ1, simDataI, simDataISigma'
def make_kernel(self, measQ: np.ndarray = None): # not sure of the output type... sasmodel?
self.measQ = measQ
return self.kernelfunc
def kernelfunc(self, **parDict: dict) -> Tuple[np.ndarray, np.ndarray]:
# print('stop here. see what we have. return I, V')
qr = self.measQ[0] * parDict["radius"]
F = 3.0 * (np.sin(qr) - qr * np.cos(qr)) / (qr**3.0)
V = (np.pi * 4.0 / 3.0) * parDict["radius"] ** 3
Int = (
V**2
# * self.scale
* ((self.sld - self.sld_solvent) / 1e2)
** 2 # WARNING: CONVERSION FACTOR PRESENT (1e2) to convert from 1/A^2 to 1/nm^2!!!
* F**2
)
return Int, V
# ibid.
[docs]class simParameters(object):
# micro-class to mimick the nested structure of SasModels in simulation model:
defaults = {
"extrapY0": 0,
"extrapScaling": 1,
"simDataQ0": np.array([0, 0]),
"simDataQ1": None,
"simDataI": np.array([1, 1]),
"simDataISigma": np.array([0.01, 0.01]),
}
[docs] def __init__(self):
pass
# ibid.
[docs]class simInfo(object):
# micro-class to mimick the nested structure of SasModels in simulation model:
parameters = simParameters()
[docs] def __init__(self):
pass
# ibid.
[docs]class McSimPseudoModel(object):
"""pretends to be a sasmodel"""
extrapY0 = None
extrapScaling = None
# simDataDict = {} # this can't be passed on in multiprocessing arguments,
# so need to pass on individual bits:
simDataQ0 = [] # first element of two-eleemnt Q list
simDataQ1 = None # second element of two-element Q list
simDataI = [] # intensity of simulated data
simDataISigma = [] # uncertainty on intensity of simulated data
settables = [
"extrapY0",
"extrapScaling",
"simDataQ0",
"simDataQ1",
"simDataI",
"simDataISigma",
]
Ipolator = None # interp1D instance for interpolating intensity
ISpolator = None # interp1D instance for interpolating uncertainty on intensity
measQ = None # needs to be set later when initializing
info = simInfo()
[docs] def __init__(self, **kwargs: dict) -> None:
# reset values to make sure we're not inheriting anything from another instance:
self.extrapY0 = None
self.extrapScaling = None
# simDataDict = {} # this can't be passed on in multiprocessing arguments,
# so need to pass on individual bits:
self.simDataQ0 = [] # first element of two-eleemnt Q list
self.simDataQ1 = None # second element of two-element Q list
self.simDataI = [] # intensity of simulated data
self.simDataISigma = [] # uncertainty on intensity of simulated data
self.Ipolator = None # interp1D instance for interpolating intensity
self.ISpolator = None # interp1D instance for interpolating uncertainty on intensity
self.measQ = None # needs to be set later when initializing
self.info = simInfo()
# overwrites settings loaded from file if specified.
for key, value in kwargs.items():
assert (
key in self.settables
), "Key '{}' is not a valid settable option. Valid options are: \n {}".format(
key, self.settables
)
setattr(self, key, value)
# if not 'simDataDict' in kwargs.keys():
assert all(
[
key in kwargs.keys()
for key in ["simDataQ0", "simDataQ1", "simDataI", "simDataISigma"]
]
), (
"The following input arguments must be provided to describe the simulation data:"
" simDataQ0, simDataQ1, simDataI, simDataISigma"
)
# self.simDataDict = {
# 'Q': (self.simDataQ0, self.simDataQ1),
# 'I': self.simDataI,
# 'ISigma': self.simDataISigma
# }
# initialize interpolators and extrapolators:
self.Ipolator = interpolate.interp1d(
self.simDataQ0,
self.simDataI,
kind="linear",
bounds_error=False,
fill_value=(self.simDataI[0], np.nan),
)
self.ISpolator = interpolate.interp1d(
self.simDataQ0,
self.simDataISigma,
kind="linear",
bounds_error=False,
fill_value=(self.simDataISigma[0], np.nan),
)
def make_kernel(self, measQ: np.ndarray): # return type?
self.measQ = measQ
return self.kernelfunc
# create extrapolator, based on the previously determined fit values:
def extrapolatorHighQ(self, Q: np.ndarray) -> np.ndarray:
y0 = self.extrapY0 # 2.21e-09
scaling = self.extrapScaling # 9.61e+01
return y0 + Q ** (-4) * scaling
def kernelfunc(self, **parDict: dict) -> Tuple[np.ndarray, np.ndarray]:
# print('stop here. see what we have. return I, V')
return self.interpscale(Rscale=parDict["factor"])
def interpscale(
self,
Rscale: float = 1.0, # scaling factor for the data. fitting parameter.
) -> Tuple[np.ndarray, np.ndarray]:
# calculate scaled intensity:
qScaled = self.measQ[0] * Rscale
scaledSim = {
"Q": [self.measQ[0]],
"I": self.Ipolator(qScaled),
"ISigma": self.ISpolator(qScaled),
}
# fill in intensity and (large) uncertainty in the extrapolated region:
# for now we assume the uncertainty on the extrapolated region to be
# the same as the magnitude of the extrapolated region:
extrapArray = np.isnan(scaledSim["I"])
scaledSim["I"][extrapArray] = self.extrapolatorHighQ(qScaled[extrapArray])
scaledSim["ISigma"][extrapArray] = self.extrapolatorHighQ(qScaled[extrapArray])
# Return Fsq-analog, i.e. a volume-squared intensity, will be volume-weighted later
return scaledSim["I"] * Rscale**6, Rscale**3
# TODO: replace with attrs @define'd dataclass:
[docs]class McModel:
"""
Specifies the fit parameter details and contains random pickers.
Configuration can be alternatively loaded from an existing result file.
Parameters
----------
fitParameterLimits: dict of value pairs {"param1": (lower, upper), ... }
for fit parameters
staticParameters: dict of parameter-value pairs {"param2": value, ...}
to keep static during the fit
seed:
random number generator seed, should vary for parallel execution
nContrib:
number of individual SasModel contributions
from which the total model intensity is calculated
modelName:
SasModels model name to load, default 'sphere'
OR: alternatively:
loadFromFile: str
A filename from a previous optimization that contains the required settings
loadFromRepetition: int
If the filename is specified, load the parameters from this particular repetition
"""
func = None # SasModels model instance
modelName = "sphere" # SasModels model name
modelDType = "fast" # model data type, choose 'fast' for single precision
kernel = object # SasModels kernel pointer
parameterSet = None # pandas dataFrame of length nContrib, with column names of parameters
staticParameters = None # dictionary of static parameter-value pairs during MC optimization
pickParameters = None # dict of values with new random picks, named by parameter names
pickIndex = None # int showing the running number of the current contribution being tested
fitParameterLimits = None # dict of value pairs (tuples) *for fit parameters only* with lower,
# upper limits for the random function generator,
# named by parameter names
randomGenerators = None # dict with random value generators
volumes = None # array of volumes for each model contribution, calculated during execution
seed = 12345 # random generator seed, should vary for parallel execution
nContrib = 300 # number of contributions that make up the entire model
settables = [
"nContrib", # these are the allowed input arguments, can also be used later for storage
"fitParameterLimits",
"staticParameters",
"modelName",
"modelDType",
"seed",
]
def fitKeys(self) -> List[str]:
return [key for key in self.fitParameterLimits.keys()]
[docs] def __init__(
self,
loadFromFile: Optional[Path] = None,
loadFromRepetition: Optional[int] = None,
resultIndex: int = 1,
**kwargs: dict,
) -> None:
# reset everything so we're sure not to inherit anything from another instance:
self.func = None # SasModels model instance
self.modelName = "sphere" # SasModels model name
self.modelDType = "fast" # model data type, choose 'fast' for single precision
self.kernel = object # SasModels kernel pointer
self.parameterSet = (
None # pandas dataFrame of length nContrib, with column names of parameters
)
self.staticParameters = (
None # dictionary of static parameter-value pairs during MC optimization
)
self.pickParameters = None # dict of values with new random picks,
# named by parameter names
self.pickIndex = (
None # int showing the running number of the current contribution being tested
)
self.fitParameterLimits = None # dict of value pairs (tuples) *for fit parameters only*
# with lower, upper limits for the random function
# generator, named by parameter names
self.randomGenerators = None # dict with random value generators
self.volumes = (
None # array of volumes for each model contribution, calculated during execution
)
self.seed = 12345 # random generator seed, should vary for parallel execution
self.nContrib = 300 # number of contributions that make up the entire model
# make sure we store and read from the right place.
self.resultIndex = McHDF.ResultIndex(resultIndex) # defines the HDF5 root path
if loadFromFile is not None:
# nContrib is reset with the length of the tables:
self.load(loadFromFile, loadFromRepetition)
# overwrites settings loaded from file if specified.
for key, value in kwargs.items():
assert (
key in self.settables
), "Key '{}' is not a valid settable option. Valid options are: \n {}".format(
key, self.settables
)
setattr(self, key, value)
if self.randomGenerators is None:
self.randomGenerators = dict.fromkeys(
[key for key in self.fitKeys()],
np.random.RandomState(self.seed).uniform,
)
if self.parameterSet is None:
self.parameterSet = pandas.DataFrame(index=range(self.nContrib), columns=self.fitKeys())
self.resetParameterSet()
if self.modelName.lower() == "sim":
self.loadSimModel()
elif self.modelName.lower() == "mcsas_sphere":
self.loadMcsasSphereModel()
else:
self.loadModel()
self.checkSettings()
def checkSettings(self) -> None:
for key in self.settables:
if key in ("seed",):
continue
val = getattr(self, key, None)
assert val is not None, "required McModel setting {} has not been defined..".format(key)
assert self.func is not None, "SasModels function has not been loaded"
assert self.parameterSet is not None, "parameterSet has not been initialized"
def calcModelIV(self, parameters: dict) -> Tuple[np.ndarray, np.ndarray]:
# moved from McCore
if (self.modelName.lower() != "sim") and (self.modelName.lower() != "mcsas_sphere"):
# Fsq has been checked with Paul Kienzle, is the part in the square brackets squared
# as in this equation (http://www.sasview.org/docs/user/models/sphere.html).
# So needs to be divided by the volume.
if isinstance(self.kernel, sasmodels.product.ProductKernel):
# call_Fq not available
Fsq = sasmodels.direct_model.call_kernel(
self.kernel, dict(self.staticParameters, **parameters)
)
# might slow it down considerably, but it appears this is the way
# to get the volume for productkernels
V_shell = self.kernel.results()["volume"]
# this needs to be done for productKernel:
Fsq = Fsq * V_shell
else:
F, Fsq, R_eff, V_shell, V_ratio = sasmodels.direct_model.call_Fq(
self.kernel, dict(self.staticParameters, **parameters)
)
else:
Fsq, V_shell = self.kernel(**dict(self.staticParameters, **parameters))
# modelIntensity = Fsq/V_shell
# modelVolume = V_shell
# TODO: check if this is correct also for the simulated data...
# Volume-weighting seems correct for the SasView models at least
# division by 4/3 np.pi seems to be necessary to bring the absolute intensity in line
# return Fsq / V_shell / (4 / 3 * np.pi), V_shell
return Fsq / V_shell, V_shell
[docs] def pick(self) -> None:
"""pick new random model parameter"""
self.pickParameters = self.generateRandomParameterValues()
[docs] def generateRandomParameterValues(self) -> None:
"""to be depreciated as soon as models can generate their own..."""
# initialize dict with parameter-value pairs defaulting to None
returnDict = dict.fromkeys([key for key in self.fitParameterLimits])
# fill:
for parName in self.fitParameterLimits.keys():
# can be replaced by a loop over iteritems:
(upper, lower) = self.fitParameterLimits[parName]
returnDict[parName] = self.randomGenerators[parName](upper, lower)
return returnDict
[docs] def resetParameterSet(self) -> None:
"""fills the model parameter values with random values"""
for contribi in range(self.nContrib):
# can be improved with a list comprehension, but this only executes once..
self.parameterSet.loc[contribi] = self.generateRandomParameterValues()
# Loading and Storing functions:
[docs] def load(self, loadFromFile: Path, loadFromRepetition: int) -> None:
"""
loads a preset set of contributions from a previous optimization, stored in HDF5
nContrib is reset to the length of the previous optimization.
"""
assert (
loadFromFile is not None
), "Input filename cannot be empty. Also specify a repetition number to load."
assert (
loadFromRepetition is not None
), "Repetition number must be given when loading model parameters from a file"
path = self.resultIndex.nxsEntryPoint / "model"
self.fitParameterLimits = McHDF.loadKV(
loadFromFile, path / "fitParameterLimits", datatype="dict"
)
self.staticParameters = McHDF.loadKV(
loadFromFile, path / "staticParameters", datatype="dict"
)
self.modelName = McHDF.loadKV(
loadFromFile, path / "modelName", datatype="str"
) # .decode('utf8')
path /= f"repetition{loadFromRepetition}"
self.parameterSet = McHDF.loadKV(
loadFromFile, path / "parameterSet", datatype="dictToPandas"
)
self.parameterSet.columns = [
colname for colname in self.parameterSet.columns
] # what does this do, a no-op?
self.volumes = McHDF.loadKV(loadFromFile, path / "volumes")
self.seed = McHDF.loadKV(loadFromFile, path / "seed")
self.modelDType = McHDF.loadKV(loadFromFile, path / "modelDType", datatype="str")
self.nContrib = self.parameterSet.shape[0]
def store(self, filename: Path, repetition: int) -> None:
assert (
repetition is not None
), "Repetition number must be given when storing model parameters into a paramFile"
assert filename is not None
path = self.resultIndex.nxsEntryPoint / "model"
McHDF.storeKVPairs(filename, path / "fitParameterLimits", self.fitParameterLimits.items())
McHDF.storeKVPairs(filename, path / "staticParameters", self.staticParameters.items())
McHDF.storeKV(
filename, path=path / "modelName", value=str(self.modelName)
) # store modelName
psDict = self.parameterSet.copy().to_dict(orient="split")
McHDF.storeKVPairs(
filename, path / f"repetition{repetition}" / "parameterSet", psDict.items()
)
McHDF.storeKVPairs(
filename,
path / f"repetition{repetition}",
[("seed", self.seed), ("volumes", self.volumes), ("modelDType", self.modelDType)],
)
# SasView SasModel helper functions:
def availableModels(self) -> None:
# show me all the available models, 1D and 1D+2D
print("\n \n 1D-only SasModel Models:\n")
for model in sasmodels.core.list_models():
modelInfo = sasmodels.core.load_model_info(model)
if not modelInfo.parameters.has_2d:
print("{} is available only in 1D".format(modelInfo.id))
print("\n \n 2D- and 1D- SasModel Models:\n")
for model in sasmodels.core.list_models():
modelInfo = sasmodels.core.load_model_info(model)
if modelInfo.parameters.has_2d:
print("{} is available in 1D and 2D".format(modelInfo.id))
def modelExists(self) -> bool:
return True
# todo: this doesn't work anymore when combining models, e.g. sphere@hardsphere
# # checks whether the given model name exists, throw exception if not
# assert (
# self.modelName in sasmodels.core.list_models()
# ), "Model with name: {} does not exist in the list of available models: \n {}".format(
# self.modelName, sasmodels.core.list_models()
# )
# return True
def loadModel(self) -> None:
# loads sasView model and puts the handle in the right place:
self.modelExists() # check if model exists
self.func = sasmodels.core.load_model(self.modelName, dtype=self.modelDType)
def loadMcsasSphereModel(self) -> None:
self.func = mcsasSphereModel(
**self.staticParameters
# no arguments here... probably
)
def loadSimModel(self) -> None:
if "simDataQ1" not in self.staticParameters.keys():
# if it was None when written, it might not exist when loading
self.staticParameters.update({"simDataQ1": None})
self.func = McSimPseudoModel(
extrapY0=self.staticParameters["extrapY0"],
extrapScaling=self.staticParameters["extrapScaling"],
simDataQ0=self.staticParameters["simDataQ0"],
simDataQ1=self.staticParameters["simDataQ1"],
simDataI=self.staticParameters["simDataI"],
simDataISigma=self.staticParameters["simDataISigma"],
)
# simDataDict= self.staticParameters['simDataDict'])
def showModelParameters(self) -> dict:
# find out what the parameters are for the set model, e.g.:
# mc.showModelParameters()
assert (
self.func is not None
), "Model must be loaded already before this function can be used, using self.loadModel()"
return self.func.info.parameters.defaults