Source code for diffpy.pdfgui.control.fitting

#!/usr/bin/env python
##############################################################################
#
# PDFgui            by DANSE Diffraction group
#                   Simon J. L. Billinge
#                   (c) 2006 trustees of the Michigan State University.
#                   All rights reserved.
#
# File coded by:    Jiwu Liu
#
# See AUTHORS.txt for a list of people who contributed.
# See LICENSE.txt for license information.
#
##############################################################################

from __future__ import print_function

import pickle
import threading
import time

from diffpy.pdfgui.control.controlerrors import ControlError, ControlStatusError, ControlValueError
from diffpy.pdfgui.control.organizer import Organizer
from diffpy.pdfgui.utils import safeCPickleDumps

# helper routines to deal with PDFfit2 exceptions


[docs] def getEngineExceptions(): """Return a tuple of possible exceptions from diffpy.pdffit2.pdffit2.""" from diffpy.pdffit2.pdffit2 import ( calculationError, constraintError, dataError, structureError, unassignedError, ) engine_exceptions = ( dataError, unassignedError, constraintError, structureError, calculationError, ) return engine_exceptions
[docs] def handleEngineException(error, gui=None): """Common handler of PDFfit2 engine exceptions. error -- instance of PDFfit2 exception gui -- reference to GUI when active """ errorInfo = "(%s)\n%s" % (error.__class__.__name__, str(error)) # be more verbose for Singular matrix exception if "singular matrix" in errorInfo.lower(): errorInfo += ( "\n\n" "Common reasons are degeneracy in fit parameters,\n" "zero thermal factors or fit range starting at zero." ) if gui: gui.postEvent(gui.ERROR, "<Engine exception> %s" % errorInfo) else: print("<Engine exception> %s" % errorInfo) return
##############################################################################
[docs] class Fitting(Organizer): """Fitting is the class to control a PdfFit process running locally. Fitting will start a new thread to interact with the PdfFit server. rw: fitness parameter tolerancy: accuracy requirement step: current refinement step res: fitting result string parameters: parameter dictionary """ # Fit status -- mask 0xff INITIALIZED = 1 CONNECTED = 1 << 1 CONFIGURED = 1 << 2 DONE = 1 << 3 # JOB Status -- mask 0xff00 VOID = 1 << 8 QUEUED = 1 << 9 RUNNING = 1 << 10 PAUSED = 1 << 11
[docs] class Worker(threading.Thread): """Worker is the daemon thread of fitting.""" def __init__(self, fitting): """Worker ( self, fitting) --> initialize. fitting -- fitting object """ threading.Thread.__init__(self) self.fitting = fitting
[docs] def run(self): """Overload function from Thread.""" try: self.fitting.run() except ControlError as error: gui = self.fitting.controlCenter.gui if gui: gui.postEvent(gui.ERROR, "<Fitting exception> %s" % error.info) else: print("<Fitting exception> %s" % error.info) except getEngineExceptions() as error: gui = self.fitting.controlCenter.gui handleEngineException(error, gui) return
def __init__(self, name): """initialize. name -- name of this fitting """ Organizer.__init__(self, name) # Thread, status, and control variables self.thread = None self.pauseEvent = threading.Event() self.fitStatus = Fitting.INITIALIZED self.jobStatus = Fitting.VOID self.stopped = False self.paused = False # the PDFfit2 server instance. self.server = None # public data members self.step = 0 self.parameters = {} self.rw = 1.0 self.tolerancy = 0.001 self.res = "" self.snapshots = [] self.res = "" # All the calculated data are to be stored in a list. # Such flat storage require unique index for each data item # self.dataNameDict translate named data to an index self.dataNameDict = {} self.itemIndex = 0 def __changeStatus(self, fitStatus=None, jobStatus=None): """Change current status of fitting. fitStatus -- new fitting status jobStatus -- new thread status """ self.fitStatus = fitStatus or self.fitStatus self.jobStatus = jobStatus or self.jobStatus if fitStatus or jobStatus: # either of them is not None gui = self.controlCenter.gui if gui: gui.postEvent(gui.UPDATE, self) gui.postEvent(gui.OUTPUT, None) def _release(self): """Release resources.""" if self.server: # server has been allocated, we need free the memory self.server.reset() def _getStrId(self): """Make a string identifier. return value: string id """ return "f_" + self.name
[docs] def copy(self, other=None): """Copy self to other. if other is None, create an instance. other -- ref to other object return value: reference to copied object """ if other is None: other = Fitting(self.name) import copy Organizer.copy(self, other) other.parameters = copy.deepcopy(self.parameters) other.snapshots = copy.deepcopy(self.snapshots) other.res = copy.deepcopy(self.res) other.dataNameDict = copy.deepcopy(self.dataNameDict) other.itemIndex = self.itemIndex return other
[docs] def load(self, z, subpath): """Load data from a zipped project file. z -- zipped project file subpath -- path to its own storage within project file returns a tree of internal hierarchy """ # subpath = projName/fitName/ subs = subpath.split("/") rootDict = z.fileTree[subs[0]][subs[1]] if "parameters" in rootDict: from diffpy.pdfgui.control.pdfguicontrol import CtrlUnpickler self.parameters = CtrlUnpickler.loads(z.read(subpath + "parameters")) if "steps" in rootDict: self.itemIndex, self.dataNameDict, self.snapshots = pickle.loads( z.read(subpath + "steps"), encoding="latin1" ) if "result" in rootDict: self.rw, self.res = pickle.loads(z.read(subpath + "result"), encoding="latin1") return Organizer.load(self, z, subpath)
[docs] def save(self, z, subpath): """Save data from a zipped project file. z -- zipped project file subpath -- path to its own storage within project file """ if self.parameters: spkl = safeCPickleDumps(self.parameters) z.writestr(subpath + "parameters", spkl) if self.res: spkl = safeCPickleDumps((self.rw, self.res)) z.writestr(subpath + "result", spkl) if self.snapshots: spkl = safeCPickleDumps((self.itemIndex, self.dataNameDict, self.snapshots)) z.writestr(subpath + "steps", spkl) Organizer.save(self, z, subpath) return
[docs] def stripped(self): """Make a copy stripped of all unpickleable data members. The copy should be suitable for pickling and has the following data members removed: controlCenter, lock, pauseEvent, thread returns reference to stripped copy """ unpickleables = ("controlCenter", "lock", "pauseEvent", "thread") naked = self.copy() for a in unpickleables: if a in self.__dict__: delattr(naked, a) return naked
[docs] def updateParameters(self): """Update parameters dictionary from active constraints. returns self.parameters """ # create dictionary of parameters used in constraints cpars = {} for struc in self.strucs: for idx, par in struc.findParameters().items(): if idx not in cpars: cpars[idx] = par for dataset in self.datasets: for idx, par in dataset.findParameters().items(): if idx not in cpars: cpars[idx] = par # add new parameters for idx, par in cpars.items(): if idx not in self.parameters: self.parameters[idx] = par # remove unused parameters unused = [idx for idx in self.parameters if idx not in cpars] for idx in unused: del self.parameters[idx] return self.parameters
[docs] def applyParameters(self): """Evaluate all constrained variables using current parameters.""" for struc in self.strucs: struc.applyParameters(self.parameters) for dataset in self.datasets: dataset.applyParameters(self.parameters) return
[docs] def changeParameterIndex(self, oldidx, newidx): """Change a parameter index to a new value. This will replace all instances of one parameter name with another in the containing fit. """ # Change the index in the current structure for struc in self.strucs: struc.changeParameterIndex(oldidx, newidx) for dataset in self.datasets: dataset.changeParameterIndex(oldidx, newidx) # Change the index if appears in linking equation initial values, e.g. # '=thisfitname:oldidx' -> '=thisfitname:newidx' fiteq = "=%s:%i" % (self.name, oldidx) newfiteq = "=%s:%i" % (self.name, newidx) from diffpy.pdfgui.control.pdfguicontrol import pdfguicontrol fits = pdfguicontrol().fits for fit in fits: parameters = fit.parameters for par in parameters.values(): if par.initialStr() == fiteq: par.setInitial(newfiteq) return
[docs] def queue(self, enter=True): """Queue or dequeue self. enter -- True to queue, False to dequeue """ if enter: if self.jobStatus == Fitting.VOID: self.__changeStatus(jobStatus=Fitting.QUEUED) else: if self.jobStatus == Fitting.QUEUED: self.__changeStatus(jobStatus=Fitting.VOID)
[docs] def getServer(self): """Get a PDFfit2 instance either locally or remotely.""" if self.fitStatus != Fitting.INITIALIZED: return # create a new instance of calculation server from diffpy.pdffit2 import PdfFit self.server = PdfFit() self.__changeStatus(fitStatus=Fitting.CONNECTED)
[docs] def configure(self): """Configure fitting.""" if self.fitStatus != Fitting.CONNECTED: return # make sure parameters are initialized self.updateParameters() self.server.reset() for struc in self.strucs: struc.clearRefined() self.server.read_struct_string(struc.initial.writeStr("pdffit")) for key, var in struc.constraints.items(): self.server.constrain(key, var.formula) # phase parameters configured for dataset in self.datasets: dataset.clearRefined() self.server.read_data_string( dataset.writeResampledObsStr(), dataset.stype, dataset.qmax, dataset.qdamp, ) self.server.setvar("qbroad", dataset.qbroad) for key, var in dataset.constraints.items(): self.server.constrain(key, var.formula) # Removed call to pdfrange call, because data were already # resampled to at fit range. # # Pair selection applies only to the current dataset, # therefore it has to be done here. nstrucs = len(self.strucs) for phaseidx, struc in zip(range(1, nstrucs + 1), self.strucs): struc.applyPairSelection(self.server, phaseidx) for index, par in self.parameters.items(): # clean any refined value par.refined = None self.server.setpar(index, par.initialValue()) # info[0] = init value # fix if fixed. Note: all parameters are free after self.server.reset(). if par.fixed: self.server.fixpar(index) # build name dict self.buildNameDict() self.__changeStatus(fitStatus=Fitting.CONFIGURED) return
[docs] def resetStatus(self): """Reset status back to initialized.""" self.snapshots = [] self.step = 0 if self.fitStatus == Fitting.INITIALIZED: return # already reset # This status will mandate allocation of a new PdfFit instance self.__changeStatus(fitStatus=Fitting.INITIALIZED)
[docs] def run(self): """Function to be run in daemon thread.""" # Begin self.__changeStatus(jobStatus=Fitting.RUNNING) try: for calc in self.calcs: calc.start() while not self.stopped and self.datasets: if not self.paused: # quick check to make sure status is right # will do nothing if status is CONFIGURED self.getServer() self.configure() # if self.refine_step return True, fitting is finished if self.refine_step(): break else: # Wait on an event, pause for a while self.__changeStatus(jobStatus=Fitting.PAUSED) self.pauseEvent.wait() # Recover from pause now self.__changeStatus(jobStatus=Fitting.RUNNING) finally: # whatever happened, resource should be released. self._release() # job status should be changed because of thread exit self.__changeStatus(jobStatus=Fitting.VOID) return
def _configureBondCalculation(self, struc): """Prepare server for bond angle or bond length calculation. struc -- instance of PDFStructure No return value. """ # struc can be handle to FitStructure.initial # let's make sure it is synchronized with current parameters self.applyParameters() self.getServer() self.server.reset() strucstr = struc.writeStr("pdffit") self.server.read_struct_string(strucstr) return
[docs] def outputBondAngle(self, struc, i, j, k): """Output bond angle defined by atoms i, j, k. The angle is calculated using the shortest lengths ji and jk with respect to periodic boundary conditions. struc -- instance of PDFStructure i, j, k -- atom indices starting at 1 No return value. The result should be automatically added to the Output Window, because all server output is sent there. Raise ControlValueError for invalid indices i, j, k. """ try: self._configureBondCalculation(struc) self.server.bang(i, j, k) self._release() except getEngineExceptions() as error: gui = self.controlCenter.gui handleEngineException(error, gui) except ValueError as error: raise ControlValueError(str(error)) return
[docs] def outputBondLengthAtoms(self, struc, i, j): """Output shortest bond between atoms i, j. Periodic boundary conditions are applied to find the shortest bond. struc -- instance of PDFStructure i, j -- atom indices starting at 1 No return value. The result should be automatically added to the Output Window, because all server output is sent there. Raise ControlValueError for invalid indices i, j. """ try: self._configureBondCalculation(struc) self.server.blen(i, j) self._release() except getEngineExceptions() as error: gui = self.controlCenter.gui handleEngineException(error, gui) except ValueError as error: raise ControlValueError(str(error)) return
[docs] def outputBondLengthTypes(self, struc, a1, a2, lb, ub): """Output all a1-a2 bond lengths within specified range. struc -- instance of PDFStructure a1 -- symbol of the first element in pair or "ALL" a2 -- symbol of the second element in pair or "ALL" lb -- lower bond length boundary ub -- upper bond length boundary No return value. The result should be automatically added to the Output Window, because all server output is sent there. Raise ControlValueError for invalid element symbols. """ try: self._configureBondCalculation(struc) self.server.blen(a1, a2, lb, ub) self._release() except getEngineExceptions() as error: gui = self.controlCenter.gui handleEngineException(error, gui) except ValueError as error: raise ControlValueError(str(error)) return
[docs] def pause(self, bPause=None): """Pause ( self, bPause = None ) --> pause a fitting process. bPause -- True to pause, False to restart. If None, it will figure out by itself. """ if bPause is None: bPause = self.jobStatus == Fitting.RUNNING if bPause: self.paused = True else: self.paused = False self.pauseEvent.set()
[docs] def start(self): """Start fitting.""" # check if paused if self.jobStatus == Fitting.PAUSED: self.pause(False) return # clean up control variable self.stopped = False self.paused = False self.resetStatus() # Restart fitting require another thread instance. self.thread = Fitting.Worker(self) self.thread.start()
[docs] def stop(self): """Stop the fitting.""" self.stopped = True # wake up daemon thread if it is paused if self.jobStatus == Fitting.PAUSED: self.pause(False)
[docs] def isThreadRunning(self): """Check if fitting thread is running. return: True if running, False otherwise """ return self.thread is not None and self.thread.is_alive()
[docs] def join(self): """Wait for current fitting to finish.""" if self.thread: self.thread.join() self.thread = None
[docs] def close(self, force=False): """Close up the fitting in order to exit. force -- if force to exit """ if force: if self.isThreadRunning(): self.stop() # NOTE: Not waiting for thread to stop. There's no graceful # way while user choose to stop forcefully else: if self.isThreadRunning(): raise ControlStatusError("Fitting: Fitting %s is still running" % self.name) if self.thread is not None: self.thread.join()
[docs] def buildNameDict(self): """Build up a data name dictionary, which will map data name to a unique index. The private dataNameDict has such structure: { 'd_data1':{'Gobs':12, 'Gcalc':11, ....}, 'd_data2':{'Gobs':10, 'Gcalc':9, ....}, ... 'p_ph1':{'lat(1)':1,'lat(2)':2, .....}, 'p_ph1':{'lat(1)':3,'lat(2)':4, .....}, ... 'f_fit':{'rw':100, 1:101, 2:102} } The value of each sub-dict is the corresponding index of this data item in the snapshot. The prefix d_ p_ f_ make dataset,struc,fitname unique within the shared name space of dictionary """ self.itemIndex = 0 dataNameDict = {} # dataNameDict for datasets for dataset in self.datasets: id = dataset._getStrId() dataNameDict[id] = {} for itemName in list(dataset.constraints.keys()) + ["Gcalc", "crw"]: dataNameDict[id][itemName] = self.itemIndex self.itemIndex += 1 # dataNameDict for strucs for struc in self.strucs: id = struc._getStrId() dataNameDict[id] = {} for itemName in struc.constraints.keys(): dataNameDict[id][itemName] = self.itemIndex self.itemIndex += 1 # dataNameDict for self id = self._getStrId() dataNameDict[id] = {} dataNameDict[id]["rw"] = self.itemIndex self.itemIndex += 1 for parameter in self.parameters.keys(): dataNameDict[id][parameter] = self.itemIndex self.itemIndex += 1 # assign to self self.dataNameDict = dataNameDict
[docs] def appendStep(self, source): """After a refinement step is done, append all data from self to the historical storage, i.e., self.snapshots. source -- where to get the fitted data, in deed it's a PdfFit2 instance """ # self.itemIndex should store total number of items snapshot = [None] * self.itemIndex # update datasets seq = 1 for dataset in self.datasets: id = dataset._getStrId() # set current dataset source.setdata(seq) # use nameDict for current dataset nameDict = self.dataNameDict[id] # get values for constrained variables for name in dataset.constraints.keys(): snapshot[nameDict[name]] = source.getvar(name) snapshot[nameDict["Gcalc"]] = dataset.Gcalc snapshot[nameDict["crw"]] = dataset.crw seq += 1 # update strucs seq = 1 for struc in self.strucs: id = struc._getStrId() # set current struc source.setphase(seq) # use nameDict for current struc nameDict = self.dataNameDict[id] # get values for constrained variables for name in struc.constraints.keys(): snapshot[nameDict[name]] = source.getvar(name) seq += 1 # update global data id = self._getStrId() nameDict = self.dataNameDict[id] snapshot[nameDict["rw"]] = self.rw for parameter in self.parameters.keys(): snapshot[nameDict[parameter]] = source.getpar(parameter) self.snapshots.append(snapshot)
[docs] def refine_step(self): """Run a single step of the fit. return value: True if refinement is finished, otherwise False """ if self.fitStatus == Fitting.DONE: # do nothing but return finished return True finished = self.server.refine_step(self.tolerancy) # get fitted data idataset = 1 for dataset in self.datasets: dataset.obtainRefined(self.server, idataset) idataset += 1 # get refined structure istruc = 1 for struc in self.strucs: struc.obtainRefined(self.server, istruc) istruc += 1 # update parameters for idx, par in self.parameters.items(): par.refined = self.server.getpar(idx) self.rw = self.server.getrw() self.step += 1 self.appendStep(self.server) # update plots and structure renderer gui = self.controlCenter.gui if gui: gui.postEvent(gui.OUTPUT, None) gui.postEvent(gui.PLOTNOW, self) if finished: self.res = "* %s\n\n" % time.ctime() + self.server.save_res_string() self.__changeStatus(fitStatus=Fitting.DONE) return finished
[docs] def getYNames(self): """Get names of data item which can be plotted as y. returns a name str list """ names = list(self.parameters.keys()) names.append("rw") return names
[docs] def getXNames(self): """Get names of data item which can be plotted as x. returns a name str list """ return []
[docs] def getData(self, name, step=-1): """Get self's data member. name -- data item name step -- step info, it can be: (1) a number ( -1 means latest step ): for single step (2) a list of numbers: for multiple steps (3) None: for all steps returns data object, be it a single number, a list, or a list of list """ # FIXME: for next plot interface, we need find how many steps the # plotter is requiring for and make exact same number of copies of # data by name data = self.getMetaData(name) if data is not None: return data return self._getData(self, name, step)
[docs] def getMetaDataNames(self): """Return all applicable meta data names.""" names = [] for dataset in self.datasets: # build up the name list if not names: names = list(dataset.metadata.keys()) else: for name in names[:]: if name not in dataset.metadata: names.remove(name) return names
[docs] def getMetaData(self, name): """Get meta data value. name -- meta data name returns meta data value """ try: return self.datasets[0].metadata[name] except (KeyError, IndexError): return None
def _getData(self, id, name, step=-1): """Get any data member from snapshots. id -- reference to a Fitting/Calculation/Phase/DataSet object name -- data item name step -- step info, it can be: (1) a number ( -1 means latest step ): for single step (2) a list of numbers: for multiple steps (3) None: for all steps returns data object, be it a single number, a list, or a list of list """ # find the unique index if len(self.snapshots) == 0: return None try: # if it is a 'int', it must be parameter. So only fitting has its value. if isinstance(name, int): id = self nameDict = self.dataNameDict[id._getStrId()] index = nameDict[name] except KeyError: return None # data is not ready if step is None: return [snapshot[index] for snapshot in self.snapshots] elif isinstance(step, list): return [self.snapshots[i][index] for i in step] else: return self.snapshots[step][index]
# End of file