Source code for diffpy.srmise.peaks.terminationripples

#!/usr/bin/env python
##############################################################################
#
# SrMise            by Luke Granlund
#                   (c) 2014 trustees of the Michigan State University
#                   (c) 2024 trustees of Columbia University in the City of New York
#                   All rights reserved.
#
# File coded by:    Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################

import logging

import numpy as np
import scipy.fftpack as fp

from diffpy.srmise.peaks.base import PeakFunction

logger = logging.getLogger("diffpy.srmise")


[docs] class TerminationRipples(PeakFunction): """Methods for evaluation and parameter estimation of a peak function with termination ripples.""" def __init__(self, base, qmax, extension=4.0, supersample=5.0, Cache=None): """Peak function constructor which adds termination ripples to existing function. Unlike other peak functions, TerminationRipples can only be evaluated over a uniform grid, or at a single value using an ad hoc uniform grid defined by qmax, extension, and supersample. Parameters ---------- base : PeakFunction instance The PeakFunction instance subclass. qmax : float The cut-off frequency in reciprocal space. extension : float How many multiples of 2pi/qmax to extend calculations in order to avoid edge effects. Default is 4.0. supersample : float Number intervals over 2pi/qmax when a natural interval cannot be determined while extending calculations. Default is 5.0. Cache : class The class (not instance) which implements caching of PeakFunction evaluations.""" parameterdict = base.parameterdict formats = base.parformats default_formats = base.default_formats self.base = base self.qmax = qmax self.extension = extension self.supersample = supersample metadict = {} metadict["qmax"] = (qmax, repr) metadict["extension"] = (extension, repr) metadict["supersample"] = (supersample, repr) PeakFunction.__init__(self, parameterdict, formats, default_formats, metadict, base, Cache) return # Methods required by PeakFunction #### # TODO: A smart way to convert from the basefunctions estimate to an # appropriate one when ripples are considered. This may not be necessary, # though.
[docs] def estimate_parameters(self, r, y): """Estimate parameters for single peak from data provided. Uses estimation routine provided by base peak function. Parameters ---------- r : array-like Data along r from which to estimate y : array-like Data along y from which to estimate Returns ------- array-like Numpy array of parameters in the default internal format. Raises SrMiseEstimationError if parameters cannot be estimated for any reason. """ return self.base.estimate_parameters(r, y)
# TODO: Can this be implemented sanely for termination ripples?
[docs] def scale_at(self, pars, x, scale): """Change parameters so value(x)->scale*value(x) for the base function. Does not change position or height of peak's maxima. Raises SrMiseScalingError if the parameters cannot be scaled. Parameters ---------- pars : array-like The parameters corresponding to a single peak x : float The position of the border scale : float The size of scaling at x. Must be positive. Returns ------- array-like The numpy array of scaled parameters. """ return self.base.scale_at(pars, x, scale)
def _jacobianraw(self, pars, r, free): """Return Jacobian of base function with termination ripples. Parameters ---------- pars : array-like The sequence of parameters for a single peak r : array-like The sequence or scalar over which pars is evaluated free : array-like The sequence of booleans which determines which derivatives are needed. True for evaluation, False for no evaluation. Returns ------- array-like The Jacobian matrix of base function with termination ripples. """ return self.base._jacobianraw(pars, r, free) def _transform_derivativesraw(self, pars, in_format, out_format): """Return gradient matrix for the pars converted from in_format to out_format. Parameters ---------- pars : array-like The sequence of parameters in_format : str The format defined for base peak function out_format : str The format defined for base peak function Returns ------- ndarray The Jacobian matrix of base function with termination ripples with out_format. """ return self.base._transform_derivativesraw(pars, in_format, out_format) def _transform_parametersraw(self, pars, in_format, out_format): """Convert parameter values from in_format to out_format. Parameters ---------- pars : array-like The sequence of parameters in_format : str The format defined for base peak function out_format : str The format defined for base peak function Returns ------- array-like The sequence of parameter values with out_format. """ return self.base._transform_parametersraw(pars, in_format, out_format) def _valueraw(self, pars, r): """Return value of base peak function for the given parameters and r values. pars : array-like The sequence of parameters for a single peak r : array-like or float The sequence or scalar over which pars is evaluated Returns ------- float The value of base peak function for the given parameters and r.""" return self.base._valueraw(pars, r) # Overridden PeakFunction functions #### # jacobian() and value() are not normally overridden by PeakFunction # subclasses, but are here to minimize the effect of edge-effects while # introducing termination ripples.
[docs] def jacobian(self, peak, r, rng=None): """Calculate (rippled) jacobian, possibly restricted by range. Parameters ---------- peak : PeakFunction instance The Peak to be evaluated r : array-like The sequence or scalar over which peak is evaluated rng : slice object Optional slice object restricts which r-values are evaluated. The output has same length as r, but unevaluated objects have a default value of 0. If caching is enabled these may be previously calculated values instead. Default is None Returns ------- jac : array-like The Jacobian of base function with termination ripples.""" if self is not peak._owner: raise ValueError( "Argument 'peak' must be evaluated by the " "PeakFunction subclass instance with which " "it is associated." ) # normally r will be a sequence, but also allow single numeric values try: if len(r) > 1: dr = (r[-1] - r[0]) / (len(r) - 1) else: # dr is ad hoc if r is a single point dr = 2 * np.pi / (self.supersample * self.qmax) if rng is None: rng = slice(0, len(r)) rpart = r[rng] (ext_r, ext_slice) = self.extend_grid(rpart, dr) jac = self._jacobianraw(peak.pars, ext_r, peak.free) output = [None for j in jac] for idx in range(len(output)): if jac[idx] is not None: jac[idx] = self.cut_freq(jac[idx], dr) output[idx] = r * 0.0 output[idx][rng] = jac[idx][ext_slice] return output except TypeError: # dr is ad hoc if r is a single point. dr = 2 * np.pi / (self.supersample * self.qmax) (ext_r, ext_slice) = self.extend_grid(np.array([r]), dr) jac = self._jacobianraw(peak.pars, ext_r, peak.free) for idx in range(len(output)): if jac[idx] is not None: jac[idx] = self.cut_freq(jac[idx], dr)[ext_slice][0] return jac
[docs] def value(self, peak, r, rng=None): """Calculate (rippled) value of peak, possibly restricted by range. This function overrides its counterpart in PeakFunction in order to minimize the impact of edge-effects from introducing termination ripples into an existing peak function. Parameters ---------- peak : Peak instance The Peak to be evaluated r : array-like The sequence or scalar over which peak is evaluated rng : slice object Optional slice object restricts which r-values are evaluated. The output has same length as r, but unevaluated objects have a default value of 0. If caching is enabled these may be previously calculated values instead. Default is None. Returns ------- output : array-like The (rippled) value of peak, possibly restricted by range. """ if self is not peak._owner: raise ValueError( "Argument 'peak' must be evaluated by the " "PeakFunction subclass instance with which " "it is associated." ) # normally r will be a sequence, but also allow single numeric values dr_super = 2 * np.pi / (self.supersample * self.qmax) if np.isscalar(r): # dr is ad hoc if r is a single point. (ext_r, ext_slice) = self.extend_grid(np.array([r]), dr_super) value = self._valueraw(peak.pars, ext_r) value = self.cut_freq(value, dr_super) return value[ext_slice][0] else: if rng is None: rng = slice(0, len(r)) output = r * 0.0 # Make sure the actual dr used for finding termination ripples # is at least as fine as dr_super, while still calculating the # function at precisely the requested points. # When the underlying function is sampled too coarsely it can # miss critical high frequency components and return a very # poor approximation to the continuous case. The actual fineness # of sampling needed to avoid the worst of these discretization # issues is difficult to determine without detailed knowledge # of the underlying function. dr = (r[-1] - r[0]) / (len(r) - 1) segments = np.ceil(dr / dr_super) dr_segmented = dr / segments rpart = r[rng] if segments > 1: rpart = np.arange(rpart[0], rpart[-1] + dr_segmented / 2, dr_segmented) (ext_r, ext_slice) = self.extend_grid(rpart, dr_segmented) value = self._valueraw(peak.pars, ext_r) value = self.cut_freq(value, dr_segmented) output[rng] = value[ext_slice][::segments] return output
[docs] def getmodule(self): return __name__
# Other methods ####
[docs] def cut_freq(self, sequence, delta): """Remove high-frequency components from sequence. This is equivalent to the discrete convolution of a signal with a sinc function sin(2*pi*r/qmax)/r. Parameters ---------- sequence : array-like The sequence to alter. delta : int The spacing between elements in sequence. Returns ------- array-like The sequence with high-frequency components removed. """ padlen = int(2 ** np.ceil(np.log2(len(sequence)))) padseq = fp.fft(sequence, padlen) dq = 2 * np.pi / ((padlen - 1) * delta) lowidx = int(np.ceil(self.qmax / dq)) hiidx = padlen + 1 - lowidx # Remove hi-frequency components padseq[lowidx:hiidx] = 0 padseq = fp.ifft(padseq) return np.real(padseq[0 : len(sequence)])
[docs] def extend_grid(self, r, dr): """Return (extended r, slice giving original range). Parameters ---------- r : array-like or float The sequence or scalar over which peak is evaluated dr : array-like or float The uncertainties over which peak is evaluated Returns ------- tuple The extended r, slice giving original range.""" ext = self.extension * 2 * np.pi / self.qmax left_ext = np.arange(r[0] - dr, max(0.0, r[0] - ext - dr), -dr)[::-1] right_ext = np.arange(r[-1] + dr, r[-1] + ext + dr, dr) ext_r = np.concatenate((left_ext, r, right_ext)) ext_slice = slice(len(left_ext), len(ext_r) - len(right_ext)) return (ext_r, ext_slice)
# end of class TerminationRipples # simple test code if __name__ == "__main__": import matplotlib.pyplot as plt from numpy.random import randn from diffpy.srmise.modelcluster import ModelCluster from diffpy.srmise.modelevaluators.aicc import AICc from diffpy.srmise.peaks.base import Peaks from diffpy.srmise.peaks.gaussianoverr import GaussianOverR res = 0.01 r = np.arange(2, 4, res) err = np.ones(len(r)) # default unknown errors pf1 = GaussianOverR(0.7) pf2 = TerminationRipples(pf1, 20.0) evaluator = AICc() pars = [[3, 0.2, 10], [3.5, 0.2, 10]] ideal_peaks = Peaks([pf1.actualize(p, "pwa") for p in pars]) ripple_peaks = Peaks([pf2.actualize(p, "pwa") for p in pars]) y_ideal = ideal_peaks.value(r) y_ripple = ripple_peaks.value(r) + 0.1 * randn(len(r)) guesspars = [[2.7, 0.15, 5], [3.7, 0.3, 5]] guess_peaks = Peaks([pf2.actualize(p, "pwa") for p in guesspars]) cluster = ModelCluster(guess_peaks, r, y_ripple, err, None, AICc, [pf2]) qual1 = cluster.quality() print(qual1.stat) cluster.fit() yfit = cluster.calc() qual2 = cluster.quality() print(qual2.stat) plt.figure(1) plt.plot(r, y_ideal, r, y_ripple, r, yfit) plt.show()