#!/usr/bin/env python
##############################################################################
#
# SrMise by Luke Granlund
# (c) 2014 trustees of the Michigan State University
# (c) 2024 trustees of Columia University in the City of New York
# All rights reserved.
#
# File coded by: Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################
"""Module for representing instances of mathematical functions.
Classes
-------
ModelPart: Superclass of Peak and Baseline
ModelParts: Collection (list) of ModelPart instances.
"""
import logging
from importlib.metadata import version
import matplotlib.pyplot as plt
import numpy as np
# Output of scipy.optimize.leastsq for a single parameter changed in scipy 0.8.0
# Before it returned a scalar, later it returned an array of length 1.
from packaging.version import parse
from scipy.optimize import leastsq
from diffpy.srmise import srmiselog
from diffpy.srmise.srmiseerrors import SrMiseFitError, SrMiseStaticOwnerError, SrMiseUndefinedCovarianceError
logger = logging.getLogger("diffpy.srmise")
__spv__ = version("scipy")
__oldleastsqbehavior__ = parse(__spv__) < parse("0.8.0")
[docs]
class ModelParts(list):
"""A collection of ModelPart instances.
Methods
-------
copy: Return deep copy
fit: Fit to given data
npars: Return total number of parameters
pack_freepars: Update free parameters with values in given sequence
residual: Return residual of model
residual_jacobian: Return jacobian of residual of model
transform: Change format of parameters.
value: Return value of model
unpack_freepars: Return sequence containing value of all free parameters
"""
def __init__(self, *args, **kwds):
list.__init__(self, *args, **kwds)
[docs]
def fit(
self,
r,
y,
y_error,
range=None,
ntrials=0,
cov=None,
cov_format="default_output",
):
"""Chi-square fit of all free parameters to given data.
There must be at least as many free parameters as data points.
Fitting is performed with the MINPACK leastsq() routine exposed by scipy.
Parameters
----------
r : array-like
The sequence of r values over which to fit
y : array-like
The sequence of y values over which to fit
y_error : array-like
The sequence of uncertainties in y
range : slice object
The slice object specifying region of r and y over which to fit.
Fits over all the data by default.
ntrials : int
The maximum number of function evaluations while fitting.
cov : ModelCovariance instance
The Optional ModelCovariance object preserves covariance information.
cov_format : str
The parameterization to use in cov.
Returns
-------
None
"""
freepars = self.unpack_freepars()
if len(freepars) >= len(r):
emsg = (
"Cannot fit model with "
+ str(len(freepars))
+ " free parametersbut only "
+ str(len(r))
+ " data points."
)
raise SrMiseFitError(emsg)
if len(freepars) == 0:
# emsg = "Cannot fit model with no free parameters."
# raise SrMiseFitError(emsg)
return
if range is None:
range = slice(None)
args = (r, y, y_error, range)
if srmiselog.liveplots:
plt.figure(1)
plt.ioff()
plt.subplot(211)
plt.cla()
plt.title("Before")
plt.plot(r, y, label="_nolabel_")
plt.plot(
r,
(y - self.value(r, range=range)) - 1.1 * (max(y) - min(y)),
label="_nolabel_",
)
for p in self:
plt.plot(r, p.value(r, range=range), label=str(p))
plt.ion()
try:
f = leastsq(
self.residual, # minimize this function
freepars, # initial parameters
args=args, # arguments to residual, residual_jacobian
Dfun=self.residual_jacobian, # explicit Jacobian
col_deriv=True, # order of derivatives in Jacobian
full_output=True,
maxfev=ntrials,
)
except NotImplementedError:
# TODO: Figure out if is worth checking for residual_jacobian
# before leastsq(). This exception will either occur almost never
# or extremely frequently, and the extra evaluations will add up.
logger.info("One or more functions do not define residual_jacobian().")
f = leastsq(
self.residual, # minimize this function
freepars, # initial parameters
args=args, # arguments to residual
col_deriv=True, # order of derivatives in Jacobian
full_output=True,
maxfev=ntrials,
)
except Exception:
# Sadly, KeyboardInterrupt, etc. is reraised as minpack.error
# Not much I can do about that, though.
import traceback
emsg = (
"Unexpected error in modelparts.fit(). Original exception:\n"
+ traceback.format_exc()
+ "End original exception."
)
raise SrMiseFitError(emsg)
result = f[0]
if __oldleastsqbehavior__ and len(freepars) == 1:
# leastsq returns a scalar when there is only one parameter
result = np.array([result])
self.pack_freepars(result)
if srmiselog.liveplots:
plt.draw()
plt.ioff()
plt.figure(1)
plt.subplot(212)
plt.cla()
plt.title("After")
plt.ion()
plt.plot(
r,
y,
r,
(y - self.value(r, range=range)) - 1.1 * (max(y) - min(y)),
*[i for sublist in [[r, p.value(r, range=range)] for p in self] for i in sublist],
)
plt.draw()
if srmiselog.wait:
print(
"Press 'Enter' to continue...",
)
input()
if f[4] not in (1, 2, 3, 4):
emsg = "Fit did not succeed -- " + str(f[3])
raise SrMiseFitError(emsg)
# clean up parameters
for p in self:
p.pars = p.owner().transform_parameters(p.pars, in_format="internal", out_format="internal")
# Supply estimated covariance matrix if requested.
# The precise relationship between f[1] and estimated covariance matrix is a little unclear from
# the documentation of leastsq. This is the interpretation given by scipy.optimize.curve_fit,
# which is a wrapper around leastsq.
if cov is not None:
pcov = f[1]
fvec = f[2]["fvec"]
dof = len(r) - len(freepars)
cov.setcovariance(self, pcov * np.sum(fvec**2) / dof)
try:
cov.transform(in_format="internal", out_format=cov_format)
except SrMiseUndefinedCovarianceError:
logger.warning("Covariance not defined. Fit may not have converged.")
return
# # Notes on the fit f
# f[0] = solution
# f[1] = Uses the fjac and ipvt optional outputs to construct an estimate of the jacobian around the solution.
# None if a singular matrix encountered (indicates very flat curvature in some direction).
# This matrix must be multiplied by the residual variance to get the covariance of the parameter
# estimates - see curve fit.
# f[2] = dictionary{nfev: int, fvec: array(), fjac: array(), ipvt: array(), qtf: array()}
# nfev - The number of function calls made
# fvec - function (residual) evaluated at solution
# fjac - "a permutation of the R matrix of a QR factorization of the final Jacobian."
# ipvt - integer array defining a permutation matrix P such that fjac*P=QR
# qtf - transpose(q)*fvec
# f[3] = message about results of fit
# f[4] = integer flag. Fit was successful on 1,2,3, or 4. Otherwise unsuccessful.
[docs]
def npars(self, count_fixed=True):
"""Return total number of parameters in all parts.
Parameters
----------
count_fixed : bool
The boolean which determines if fixed parameters are
included in the count.
Returns
-------
n : int
The total number of parameters.
"""
n = 0
for p in self:
n += p.npars(count_fixed=count_fixed)
return n
[docs]
def pack_freepars(self, freepars):
"""Update parameters with values from sequence of freepars.
Parameters
----------
freepars : array-like
The sequence of free parameters.
Returns
-------
None
"""
if np.isnan(freepars).any():
emsg = "Non-numeric free parameters."
raise ValueError(emsg)
freeidx = 0
for p in self:
freeidx += p.update(freepars[freeidx:])
[docs]
def residual(self, freepars, r, y_expected, y_error, range=None):
"""Calculate residual of all parameters.
Parameters
----------
freepars : array-like
The sequence of free parameters
r : array-like
The input domain
y_expected : array-like
The sequence of expected values
y_error : array-like
The sequence of uncertainties in y-variable
range : slice object
The slice object specifying region of r and y over which to fit.
All the data by default.
Returns
-------
array-like
The residual of all parameters.
"""
self.pack_freepars(freepars)
total = self.value(r, range)
try:
if range is None:
range = slice(0, len(r))
return (y_expected[range] - total[range]) / y_error[range]
except TypeError:
return (y_expected - total) / y_error
[docs]
def residual_jacobian(self, freepars, r, y_expected, y_error, range=None):
"""Calculate the Jacobian of freepars.
Parameters
freepars : array-like
The sequence of free parameters
r : array-like
The input domain
y_expected : array-like
The sequence of expected values
y_error : array-like
The sequence of uncertainties in y-variable
range : slice object
The slice object specifying region of r and y over which to fit.
All the data by default.
Returns
-------
ndarray
The Jacobian of all parameters.
"""
if len(freepars) == 0:
raise ValueError(
"Argument freepars has length 0. The Jacobian " "is only defined with >=1 free parameters."
)
self.pack_freepars(freepars)
tempJac = []
for p in self:
tempJac[len(tempJac) :] = p.jacobian(r, range)
# Since the residual is (expected - calculated) the jacobian
# of the residual has a minus sign.
jac = -np.array([j for j in tempJac if j is not None])
try:
if range is None:
range = slice(0, len(r))
return jac[:, range] / y_error[range]
except TypeError:
return jac / y_error
[docs]
def value(self, r, range=None):
"""Calculate total value of all parts over range.
Parameters
----------
r : array-like
The input domain
range : slice object
The slice object specifying region of r and y over which to fit.
All the data by default.
Returns
-------
total : float
The total value of all slice region of r.
"""
total = r * 0.0
for p in self:
total += p.value(r, range)
return total
[docs]
def unpack_freepars(self):
"""Return array of all free parameters."""
# To check: ravel() sometimes returns a reference and othertimes a copy.
# Do I need to use flatten() instead?
return np.concatenate([p.compress() for p in self]).ravel()
[docs]
def covariance(self, format="internal", **kwds):
"""Return estimated covariance matrix of the model.
The covariance matrix may be given in terms of any parameterization
defined by the formats for each individual ModelPart.
Parameters
----------
format : str
The format ("internal" by default) to use for all ModelParts.
This may be overridden for specific peaks as shown below.
Keywords
--------
f0 : str
The format of the 0th ModelPart
f1 : str
The format of the 1st ModelPart
etc.
Returns
-------
cov : ndarray
The estimated covariance matrix.
"""
formats = [format for p in self]
for k, v in kwds.items():
try:
int(k[1:])
except ValueError:
emsg = "Invalid format keyword '%s'. They must be specified as 'f0', 'f1', etc." % k
raise ValueError(emsg)
formats[int(k[1:])] = v
return
[docs]
def copy(self):
"""Return deep copy of this ModelParts.
The original and the copy are completely independent, except each
ModelPart and its copy still reference the same owner.
Returns
-------
ModelParts
The deep copy of this ModelParts.
"""
return type(self).__call__([p.copy() for p in self])
def __str__(self):
"""Return string representation of this ModelParts."""
return "".join([str(p) + "\n" for p in self])
def __getitem__(self, index):
"""Extends list.__getitem__"""
if isinstance(index, tuple) and len(index) == 2:
start, end = index
return self.__class__(super().__getitem__(slice(start, end)))
else:
return super().__getitem__(index)
# End of class ModelParts
[docs]
class ModelPart(object):
"""Represents a single part (instance of some function) of a model.
Attributes
-------
pars : array-like
The array containing the parameters of this model part
free : array-like
The array containing boolean values defining whether the corresponding parameter
is free or not.
removable : bool
The boolean determining whether or not this model part can be
removed during extraction.
static_owner : bool
The boolean determines if owner can be changed with changeowner()
Methods
-------
changeowner(new_owner)
Change the owner of the model part instance.
copy()
Return a deep copy of the model part instance.
compress()
Return parameters with non-free parameters removed.
jacobian()
Compute and return the Jacobian matrix for the model part.
getfree(index=None, keyword=None)
Retrieve a free parameter by index or keyword defined by the owner.
npars()
Return the number of parameters in this model part.
owner()
Return the current owner of the model part.
setfree(index=None, value=None, keyword=None, new_value=None)
Set a free parameter by index or keyword defined by the owner.
update(values)
Update free parameters with values from a given sequence.
value()
Compute and return the value of the model part based on its parameters.
writestr()
Generate and return a string representation of the model part.
"""
def __init__(self, owner, pars, free=None, removable=True, static_owner=False):
"""Constructor for instance members.
Parameters
owner : BaseFunction subclass
The instance of a BaseFunction subclass
pars : array-like
The sequence of parameters which specify the function explicitly
free : array-like
The sequence of Boolean variables. If False, the corresponding
parameter will not be changed.
removable : bool
The boolean determines whether this part can be removed.
static_owner : bool
Whether or not the part can be changed with
changeowner()
Note that free and removable are not mutually exclusive. If any
pars are not free but removable=True then the part may be removed, but
the held parameters for this part will remain unchanged until then.
"""
self._owner = owner
if len(pars) != owner.npars:
emsg = "The length of pars must equal the number of parameters " + "specified by the model part owner."
raise ValueError(emsg)
self.pars = np.array(pars[:]) # pars[:] in case pars is a ModelPart
if free is None:
self.free = np.array([True for p in pars], dtype=bool)
else:
self.free = np.array(free, dtype=bool)
if len(self.free) != owner.npars:
emsg = (
"The length of free must be equal to the number of "
+ "parameters specified by the model part owner."
)
raise ValueError(emsg)
self.removable = removable
self.static_owner = static_owner
[docs]
def changeowner(self, owner):
"""Change the owner of this part.
Does not change the parameters associated with this model part. Raises
SrMiseStaticOwnerError if this peak has been declared to have a static
owner, or if the number of parameters is incompatible.
Parameters
----------
owner : BaseFunction subclass
The instance of a BaseFunction subclass
Returns
-------
None
"""
if self.static_owner and self._owner is not owner:
emsg = "Cannot change owner if static_owner is True."
raise SrMiseStaticOwnerError(emsg)
if self._owner.npars != owner.npars:
emsg = "New owner specifies different number of parameters than " + "original owner."
raise SrMiseStaticOwnerError(emsg)
self._owner = owner
[docs]
def compress(self):
"""Return part parameters with non-free values removed.
Returns
-------
pars : array-like
The compressed parameters of the model part."""
return self.pars[self.free]
[docs]
def jacobian(self, r, range=None):
"""Return jacobian of this part over r.
Parameters
----------
r : array-like
The input domain
range : slice object
The slice object specifying region of r and y over which to fit.
All the data by default.
Returns
-------
jacobian : array-like
The jacobian of the model part.
"""
return self._owner.jacobian(self, r, range)
[docs]
def owner(self):
"""Return the BaseFunction subclass instance which owns this part.
Returns
-------
BaseFunction subclass
The BaseFunction subclass which owns this part."""
return self._owner
[docs]
def update(self, freepars):
"""Sequentially update free parameters from freepars.
Parameters
----------
freepars : array-like
The sequence of new parameter values. May contain more
parameters than can actually be updated.
Returns
-------
numfree
number of parameters updated from freepars.
"""
numfree = self.npars(count_fixed=False)
if len(freepars) < numfree:
pass # raise "freepars does not have enough elements to
# update every unheld parameter."
# TODO: Check if I need to make copies here, or if references
# to parameters are safe.
self.pars[self.free] = freepars[:numfree]
return numfree
[docs]
def value(self, r, range=None):
"""Return value of peak over r.
Parameters
----------
r : array-like
The input domain
range : slice object
The slice object specifying region of r and y over which to fit.
All the data by default.
Returns
-------
value : array-like
The value of peak over r.
"""
return self._owner.value(self, r, range)
[docs]
def copy(self):
"""Return a deep copy of this ModelPart.
The original and the copy are completely independent, except they both
reference the same owner.
Returns
-------
ModelPart
A deep copy of this ModelPart.
"""
return type(self).__call__(self._owner, self.pars, self.free, self.removable, self.static_owner)
def __getitem__(self, key_or_idx):
"""Return parameter of peak corresponding with key_or_idx.
Parameters
----------
key_or_idx : Optional[int, slice, key]
The integer index, slice, or key from owner's parameter
dictionary.
Returns
-------
pars : array-like
The value of the peak corresponding to key_or_idx.
"""
if key_or_idx in self._owner.parameterdict:
return self.pars[self._owner.parameterdict[key_or_idx]]
else:
return self.pars[key_or_idx]
[docs]
def getfree(self, key_or_idx):
"""Return value of free corresponding with key_or_idx.
Parameters
----------
key_or_idx : Optional[int, slice object, key]
The integer index, slice, or key from owner's parameter
dictionary.
Returns
-------
freepars : array-like
The value of the free corresponding to key_or_idx.
"""
if key_or_idx in self._owner.parameterdict:
return self.free[self._owner.parameterdict[key_or_idx]]
else:
return self.free[key_or_idx]
[docs]
def setfree(self, key_or_idx, value):
"""Set value of free corresponding with key_or_idx.
Parameters
----------
key_or_idx : Optional[int, slice object, key]
The integer index, slice, or key from owner's parameter
dictionary.
value : bool
The boolean to set in free corresponding to key_or_idx.
Returns
-------
None
"""
if key_or_idx in self._owner.parameterdict:
self.free[self._owner.parameterdict[key_or_idx]] = value
else:
self.free[key_or_idx] = value
def __len__(self):
"""Return number of parameters, including any fixed ones."""
return self._owner.npars
[docs]
def npars(self, count_fixed=True):
"""Return total number of parameters in all parts.
Parameters
----------
count_fixed : bool
The boolean which determines if fixed parameters are
included in the count.
Returns
-------
int
The number of parameters in all parts."""
if count_fixed:
return self._owner.npars
else:
return np.sum(self.free)
def __str__(self):
"""Return string representation of ModelPart parameters."""
return str(self._owner.transform_parameters(self.pars, in_format="internal", out_format="default_output"))
def __eq__(self, other):
""" """
if hasattr(other, "_owner"):
return (
(self._owner is other._owner)
and np.all(self.pars == other.pars)
and np.all(self.free == other.free)
and self.removable == other.removable
)
else:
return False
def __ne__(self, other):
""" """
return not self == other
[docs]
def writestr(self, ownerlist):
"""Return string representation of ModelPart.
The value of owner is determined by its index in ownerlist.
Parameters
----------
ownerlist : array-like
The list of owner functions
Returns
-------
datastring
The string representation of ModelPart.
"""
if self._owner not in ownerlist:
emsg = "ownerlist does not contain this ModelPart's owner."
raise ValueError(emsg)
lines = []
lines.append("owner=%s" % repr(ownerlist.index(self._owner)))
# Lists/numpy arrays don't give full representation of long lists
lines.append("pars=[%s]" % ", ".join([repr(p) for p in self.pars]))
lines.append("free=[%s]" % ", ".join([repr(f) for f in self.free]))
lines.append("removable=%s" % repr(self.removable))
lines.append("static_owner=%s" % repr(self.static_owner))
datastring = "\n".join(lines) + "\n"
return datastring
# End of class ModelPart
# simple test code
if __name__ == "__main__":
pass