#!/usr/bin/env python
##############################################################################
#
# SrMise by Luke Granlund
# (c) 2014 trustees of the Michigan State University
# (c) 2024 trustees of Columbia University in the City of New York
# All rights reserved.
#
# File coded by: Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################
import logging
import numpy as np
from diffpy.srmise.basefunction import BaseFunction
from diffpy.srmise.modelparts import ModelPart, ModelParts
from diffpy.srmise.srmiseerrors import SrMiseDataFormatError, SrMiseScalingError
logger = logging.getLogger("diffpy.srmise")
[docs]
class PeakFunction(BaseFunction):
"""Base class for functions which represent peaks.
Class members
-------------
parameterdict: A dictionary mapping string keys to their index in the
sequence of parameters. The "position" key is required,
while all others are arbitrary. These keys apply only to
the default "internal" format.
parformats: A sequence of strings defining what formats are recognized
by a peak function.
default_formats: A dictionary which maps the strings "default_input" and
"default_output" to strings also appearing in parformats.
"default_input"-> format used internally within the class
"default_output"-> Default format to use when converting
parameters for outside use.
Class methods (implemented by inheriting classes)
-------------------------------------------------
estimate_parameters()
scale_at()
_jacobianraw() (optional, but strongly recommended)
_transform_derivativesraw() (optional, supports propagation of uncertainty for different paramaterizations)
_transform_parametersraw()
_valueraw()
Class methods
-------------
actualize()
Inherited methods
-----------------
jacobian()
value()
transform_derivatives()
transform_parameters()
"""
def __init__(
self,
parameterdict,
parformats,
default_formats,
metadict,
base=None,
Cache=None,
):
"""Set parameterdict defined by subclass
parameterdict: A dictionary mapping string keys to their index in a
sequence of parameters for this PeakFunction subclass.
The key "position" is required.
parformats: A sequence strings containing all allowed input/output
formats defined for the peak function's parameters.
default_formats: A dictionary mapping the string keys "internal" and
"default_output" to formats from parformats.
metadict: Dictionary mapping string keys to tuple (v, m) where v is an
additional argument required by function, and m is a method
whose string output recreates v when passed to eval().
base: A basefunction subclass instance which this one decorates with
additional functionality.
Cache: A class (not instance) which implements caching of BaseFunction
evaluations."""
if "position" not in parameterdict:
emsg = "Argument parameterdict missing required key 'position'."
raise ValueError(emsg)
BaseFunction.__init__(self, parameterdict, parformats, default_formats, metadict, base, Cache)
# # "Virtual" class methods ####
[docs]
def scale_at(self, peak, x, scale):
emsg = "scale_at must be implemented in a PeakFunction subclass."
raise NotImplementedError(emsg)
# # Methods required by BaseFunction ####
[docs]
def actualize(
self,
pars,
in_format="default_input",
free=None,
removable=True,
static_owner=False,
):
converted = self.transform_parameters(pars, in_format, out_format="internal")
return Peak(self, converted, free, removable, static_owner)
[docs]
def getmodule(self):
return __name__
# end of class PeakFunction
[docs]
class Peaks(ModelParts):
"""A collection for Peak objects."""
def __init__(self, *args, **kwds):
# Check that args[0] (if it exists) is an instance of Peaks?
ModelParts.__init__(self, *args, **kwds)
[docs]
def argsort(self, key="position"):
"""Return sequence of indices which sort peaks in order specified by key."""
keypars = np.array([p[key] for p in self])
# In normal use the peaks will already be sorted, so check for it.
sorted = True
for i in range(len(keypars) - 1):
if keypars[i] > keypars[i + 1]:
sorted = False
break
if not sorted:
return keypars.argsort().tolist()
else:
return range(len(keypars))
[docs]
def match_at(self, x, y):
"""Alter peaks so their sum at x is y, preserving each peak's maximum.
Each peak is scaled equally. Peaks with fixed parameters, a maximum
very close to x, or other issues may prevent optimal results. If the
peaks cannot be scaled at all they are left unchanged.
Parameters:
x: (float) Position at which to match.
y: (float) Height to match.
Returns True if one or more peaks was scaled, False otherwise.
"""
height = self.value(x)
if height == 0:
return False
orig = self.copy()
try:
scale = y / height
# First attempt at scaling peaks. Record which peaks, if any,
# were not scaled in case a second attempt is required.
scaled = []
all_scaled = True
any_scaled = False
fixed_height = 0.0
for peak in self:
scaled.append(peak.scale_at(x, scale))
all_scaled = all_scaled and scaled[-1]
any_scaled = any_scaled or scaled[-1]
if not scaled[-1]:
fixed_height += peak.value(x)
# Second attempt at scaling peaks.
if not all_scaled and fixed_height < y and fixed_height < height:
self[:] = orig[:]
any_scaled = False
scale = (y - fixed_height) / (height - fixed_height)
for peak, s in (self, scaled):
if s:
# "or" is short-circuited, so scale_at() must be first
# to guarantee it is called.
any_scaled = peak.scale_at(x, scale) or any_scaled
except Exception as e:
logger.debug("An exception prevented matching -- %s", e)
self[:] = orig[:]
return False
return any_scaled
[docs]
def sort(self, reverse=False, key="position"):
"""Sort peaks in order specified by key."""
keypars = np.array([p[key] for p in self])
order = keypars.argsort()
self[:] = [self[idx] for idx in order]
return
# End of class Peaks
[docs]
class Peak(ModelPart):
"""Represents a single peak associated with a PeakFunction subclass."""
def __init__(self, owner, pars, free=None, removable=True, static_owner=False):
"""Set instance members.
owner: an instance of a PeakFunction subclass
pars: Sequence of parameters which define a single peak
free: Sequence of Boolean variables. If False, the corresponding
parameter will not be changed.
removable: Boolean determines whether this peak can be removed.
static_owner: (False) Whether or not the owner can be changed with
changeowner()
Note that free and removable are not mutually exclusive. If any
values are not free but removable=True then the entire peak may be
removed during peak extraction, but the held parameters for this
peak will remain unchanged until that point.
"""
ModelPart.__init__(self, owner, pars, free, removable, static_owner)
[docs]
def scale_at(self, x, scale):
"""Change parameters so value(x)->scale*value(x).
Does not change position or height of peak's maxima. If parameters
that are not free would be changed, or violates other constraints,
the peak is not adjusted.
Parameters
x: (float) Position of the border
scale: (float > 0) Amount by which to scale.
Returns True if parameters were scaled, False otherwise.
"""
# Reminder: Bitwise operators "&" and "~" work element-wise with
# numpy arrays.
# Check for no free parameters.
if np.all(~self.free):
return False
try:
adj_pars = self._owner.scale_at(self.pars, x, scale)
except SrMiseScalingError as err:
logger.debug("Cannot scale peak:", err)
return False
# Check if a fixed parameter was changed.
if np.any((self.pars != adj_pars) & ~self.free):
logger.debug("Cannot scale peak: a fixed parameter was changed")
return False
self.pars = adj_pars
return True
[docs]
@staticmethod
def factory(peakstr, ownerlist):
"""Instantiate a Peak from a string.
Parameters:
peakstr: string representing peak
ownerlist: List of BaseFunctions that owner is in
"""
data = peakstr.strip().splitlines()
# dictionary of parameters
pdict = {}
for d in data:
parse_value = d.split("=", 1)
if len(parse_value) == 2:
try:
pdict[parse_value[0]] = eval(parse_value[1])
except Exception:
emsg = "Invalid parameter: %s" % d
raise SrMiseDataFormatError(emsg)
else:
emsg = "Invalid parameter: %s" % d
raise SrMiseDataFormatError(emsg)
# Correctly initialize the base function, if one exists.
idx = pdict["owner"]
if idx > len(ownerlist):
emsg = "Dependent base function not in ownerlist."
raise ValueError(emsg)
pdict["owner"] = ownerlist[idx]
return Peak(**pdict)
# End of class Peak
# simple test code
if __name__ == "__main__":
import matplotlib.pyplot as plt
from numpy.random import randn
from diffpy.srmise.modelcluster import ModelCluster
from diffpy.srmise.modelevaluators.aicc import AICc
from diffpy.srmise.peaks.gaussianoverr import GaussianOverR
res = 0.01
r = np.arange(2, 4, res)
err = np.ones(len(r)) # default unknown errors
pf = GaussianOverR(0.7)
evaluator = AICc()
pars = [[3, 0.2, 10], [3.5, 0.2, 10]]
ideal_peaks = Peaks([pf.actualize(p, "pwa") for p in pars])
y = ideal_peaks.value(r) + 0.1 * randn(len(r))
guesspars = [[2.7, 0.15, 5], [3.7, 0.3, 5]]
guess_peaks = Peaks([pf.actualize(p, "pwa") for p in guesspars])
cluster = ModelCluster(guess_peaks, r, y, err, None, AICc, [pf])
qual1 = cluster.quality()
print(qual1.stat)
cluster.fit()
yfit = cluster.calc()
qual2 = cluster.quality()
print(qual2.stat)
plt.figure(1)
plt.plot(r, y, r, yfit)
plt.show()