Source code for mcsas3.McHat

import sys
import time
from io import StringIO
from pathlib import Path, PurePosixPath
from typing import Optional

import numpy as np

import mcsas3.McHDF as McHDF

from .mccore import McCore
from .mcmodel import McModel
from .mcopt import McOpt

STORE_LOCK = None


[docs]def initStoreLock(lock): global STORE_LOCK STORE_LOCK = lock
# TODO: use attrs to @define a McHat dataclass
[docs]class McHat: """ The hat sits on top of the McCore. It takes care of parallel processing of each repetition. """ _measData = None # measurement data dict with entries for Q, I, ISigma _modelArgs = None # dict with settings to be passed on to the model instance _optArgs = None # dict with optimization settings to be passed on to the optimization instance _model = None # McModel instance for multiple repetitions _opt = None # McOpt instance for multiple repetitions nCores = 0 # number of cores to use for parallelization, # 0: autodetect, 1: without multiprocessing nRep = 10 # number of independent repetitions to opitimize storeKeys = [ # keys to store in an output file "nCores", "nRep", ] loadKeys = storeKeys
[docs] def __init__( self, loadFromFile: Optional[Path] = None, resultIndex: int = 1, **kwargs: dict ) -> None: # reset to make sure we're not inheriting any settings from another instance: self._measData = None # measurement data dict with entries for Q, I, ISigma self._modelArgs = None # dict with settings to be passed on to the model instance self._optArgs = ( None # dict with optimization settings to be passed on to the optimization instance ) self._model = None # McModel instance for multiple repetitions self._opt = None # McOpt instance for multiple repetitions self.nCores = 0 # number of cores to use for parallelization, # 0: autodetect, 1: without multiprocessing self.nRep = 10 # number of independent repetitions to opitimize """kwargs accepts all parameters from McModel and McOpt.""" # make sure we store and read from the right place. self.resultIndex = McHDF.ResultIndex(resultIndex) # defines the HDF5 root path if loadFromFile is not None: self.load(loadFromFile) self._optArgs = dict([(key, kwargs.pop(key)) for key in McOpt.storeKeys if key in kwargs]) self._optArgs.update({"resultIndex": resultIndex}) self._modelArgs = dict( [(key, kwargs.pop(key)) for key in McModel.settables if key in kwargs] ) self._modelArgs.update({"resultIndex": resultIndex}) for key, value in kwargs.items(): assert key in self.storeKeys, "Key {} is not a valid option".format(key) setattr(self, key, value) assert self.nRep > 0, "Must optimize for at least one repetition"
def fillFitParameterLimits(self, measData: dict) -> None: for key, val in self._modelArgs["fitParameterLimits"].items(): if isinstance(val, str): assert val == "auto", ( "Only fit parameter options are either providing [min, max] limits or setting" ' to "auto"' ) # auto-fill values assert ( np.min(measData["Q"]) > 0 ), "for auto-scaling of measurement limits, the smallest Q value cannot be zero" self._modelArgs["fitParameterLimits"][key] = [ np.pi / np.max(measData["Q"]), np.pi / np.min(measData["Q"]), ]
[docs] def run(self, measData: dict, filename: Path, resultIndex: int = 1) -> None: """runs the full sequence: multiple repetitions of optimizations, to be parallelized. This probably needs to be taken out of core, and into a new parent""" # ensure the fit parameter limits are filled in based on the data limits if auto self.fillFitParameterLimits(measData) if self.nCores == 1: for rep in range(self.nRep): self.runOnce(measData, filename, rep, resultIndex=resultIndex) # elif self.nCores == 2: # print([(measData, filename, r) for r in range(self.nRep)]) else: import multiprocessing if self.nCores == 0: # don't run more processes than we need... self.nCores = np.minimum(multiprocessing.cpu_count(), self.nRep) start = time.time() lock = multiprocessing.Lock() pool = multiprocessing.Pool(self.nCores, initializer=initStoreLock, initargs=(lock,)) runArgs = [(measData, filename, r, True, resultIndex) for r in range(self.nRep)] outputs = pool.starmap(self.runOnce, runArgs) pool.close() pool.join() print( "McSAS analysis with {} repetitions took {:.1f}s with {} threads.".format( self.nRep, time.time() - start, min(self.nCores, self.nRep) ) ) # for args in runArgs: # buf = args[-1] # print(buf, buf.getvalue()) # last argument is stdio buffer for output in sorted(outputs, key=lambda x: x[0]): print(output)
[docs] def runOnce( self, measData: dict, filename: Path, repetition: int = 0, bufferStdIO: bool = False, resultIndex: int = 1, ) -> None: """runs the full sequence: multiple repetitions of optimizations, to be parallelized. This probably needs to be taken out of core, and into a new parent""" if bufferStdIO: # buffer stdout/err in an individual StringIO object for each repetition sys.stderr = sys.stdout = StringIO() if self._opt is None: self._opt = McOpt(**self._optArgs) if self._model is None: self._model = McModel(**self._modelArgs) self._opt.repetition = repetition self._model.resetParameterSet() mc = McCore(measData, model=self._model, opt=self._opt, resultIndex=resultIndex) mc.optimize() try: self._model.kernel.release() except AttributeError: pass # can happen with a simulation model except Exception as e: print(f"{mc}: {e}: {str(e)}\n") print("Final chiSqr: {}, N accepted: {}".format(self._opt.gof, self._opt.accepted)) # storing the results if STORE_LOCK is not None: # prevent multiple threads writing HDF5 file simultaneously STORE_LOCK.acquire() try: mc.store(filename=filename) self.store(filename=filename) except Exception as e: print(f"{mc}: {e}: {str(e)}\n") finally: if STORE_LOCK is not None: STORE_LOCK.release() if bufferStdIO: # return buffered output if desired return sys.stdout.getvalue() return
# same as in McOpt
[docs] def store(self, filename: Path, path: Optional[PurePosixPath] = None) -> None: """stores the settings in an output file (HDF5)""" if path is None: path = self.resultIndex.nxsEntryPoint / "optimization" McHDF.storeKVPairs( filename, path, [(key, getattr(self, key, None)) for key in self.storeKeys] )
# same as in McOpt, except for the repetition (in McOpt) def load(self, filename: Path, path: Optional[PurePosixPath] = None) -> None: if path is None: path = self.resultIndex.nxsEntryPoint / "optimization" for key, value in McHDF.loadKVPairs(filename, path, self.loadKeys): setattr(self, key, value)