Source code for diffpy.srmise.pdfpeakextraction

#!/usr/bin/env python
##############################################################################
#
# SrMise            by Luke Granlund
#                   (c) 2014 trustees of the Michigan State University
#                   (c) 2024 trustees of Columia University in the City of New York
#                   All rights reserved.
#
# File coded by:    Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################

import logging
import os.path
import re

import numpy as np

from diffpy.srmise import srmiselog
from diffpy.srmise.modelcluster import ModelCluster, ModelCovariance

# from diffpy.pdfgui.control.pdfdataset import PDFDataSet
from diffpy.srmise.pdfdataset import PDFDataSet
from diffpy.srmise.peakextraction import PeakExtraction
from diffpy.srmise.srmiseerrors import (
    SrMiseDataFormatError,
    SrMiseError,
    SrMiseQmaxError,
    SrMiseStaticOwnerError,
    SrMiseUndefinedCovarianceError,
)

logger = logging.getLogger("diffpy.srmise")


[docs] class PDFPeakExtraction(PeakExtraction): """PDFPeakExtraction extends the PeakExtraction class to specialize in extracting peaks from PDF (Probability Density Function) data. Parameters ---------- filename : str The source PDF file path. nyquist : bool, optional Specifies whether to fit the final model at the Nyquist sampling rate. Defaults to False. qmax : float, optional The maximum q value to use during peak extraction. Use 0 to denote infinity. Defaults to 0. qmax_reportedbypdf : float The qmax value read directly from the PDF file. qmax_fromdata : float The qmax value determined directly from the PDF data. scale : bool, optional Determines whether to use increased uncertainties during supersampling. This can expedite extraction by minimizing the detection of minuscule peaks, albeit risking the overlook of minor features. When `Nyquist` is True, uncertainties are scaled to maintain a similar chi-square error pre- and post-resampling. Defaults to False if `Nyquist` is True. supersample : int, optional Ensures the data is supersampled by at least this factor above the Nyquist rate before initiating peak extraction. Defaults to 1. """ def __init__(self): """Initialize the PDFPeakExtraction class.""" newvars = ["qmax", "supersample", "nyquist", "scale"] PeakExtraction.__init__(self, newvars) return
[docs] def loadpdf(self, pdf): """Load dataset. Parameters ---------- pdf: PDFDataSet instance or str The PDFDataSet instance or a PDF file name. """ self.clear() if isinstance(pdf, PDFDataSet): d = pdf else: d = PDFDataSet("ForPeakExtraction") d.read(pdf) self.filename = os.path.abspath(pdf) self.setdata(d.robs, d.Gobs, d.drobs, d.dGobs) self.qmax_reportedbypdf = d.qmax return
[docs] def setdata(self, x, y, dx=None, dy=None): """Set data. Parameters ---------- x : array-like The x-coordinates of the data. y : array-like The y-coordinates of the data.""" PeakExtraction.setdata(self, x, y, dx, dy) try: self.qmax_fromdata = find_qmax(self.x, self.y)[0] except Exception: logger.info("Could not determine qmax from the data.")
[docs] def clear(self): """Clear all members. The purpose of the method is to ensure the object is in a clean state.""" # TODO: Clear additional members self.filename = None self.nyquist = None self.qmax = None self.qmax_reportedbypdf = None self.qmax_fromdata = None self.scale = None self.supersample = None PeakExtraction.clear(self)
[docs] def setvars(self, quiet=False, **kwds): """Set one or more extraction variables. Parameters ---------- quiet : bool, optional Log changes quietly. Default is False. **kwds : keyword arguments Additional variables to set. Possible keywords include: - cres : float The clustering resolution, must be greater than or equal to 0. - effective_dy : float The uncertainties actually used during extraction. Aliased as 'dg'. - pf : list of PeakFunctionBase subclasses instances Sequence of peak function base subclass instances. - baseline : Baseline or BaselineFunction instance Baseline instance or BaselineFunction instance for built-in estimation. - error_method : ErrorEvaluator subclass instance Error evaluator subclass instance used to compare models. Default is AIC. - initial_peaks : Peaks instance Peaks instance representing the peaks present at the start of extraction. - rng : tuple of (float, float) Specifies the least and greatest x-values over which to extract peaks. - qmax : float or "automatic" The qmax value for the probability density function (pdf). If set to "automatic", it will be estimated from the data. - nyquist : bool Whether to use nyquist sampling or not. - supersample : int Degree of supersampling above the Nyquist rate to use. - scale : bool Scale uncertainties on recursion when nyquist is True. """ # Treat "dg" as alias for "effective_dy" if "dg" in kwds: if "effective_dy" not in kwds: kwds["effective_dy"] = kwds.pop("dg") else: emsg = "Keyword 'dg' is alias for 'effective_dy', cannot specify both." raise ValueError(emsg) if "qmax" in kwds: if kwds["qmax"] == "automatic": if self.qmax_fromdata is not None: kwds["qmax"] = self.qmax_fromdata else: emsg = "Qmax could not be automatically determined." raise SrMiseQmaxError(emsg) if "supersample" in kwds: if kwds["supersample"] < 1.0: emsg = "Supersample must be a value greater than 1." raise ValueError(emsg) PeakExtraction.setvars(self, quiet, **kwds)
[docs] def defaultvars(self, *args): """Set default values. Parameters ---------- *args : argparse.Namespace Arguments passed to PeakExtraction.setdata().""" nargs = list(args) # qmax preference: reported, then fromdata, then 0. if self.qmax is None or "qmax" in args: if self.qmax_reportedbypdf is not None: self.qmax = self.qmax_reportedbypdf elif self.qmax_fromdata is not None: self.qmax = self.qmax_fromdata else: self.qmax = 0.0 if "qmax" in args: nargs.remove("qmax") # nyquist if self.nyquist is None or "nyquist" in args: if self.qmax > 0: self.nyquist = True else: self.nyquist = False if "nyquist" in args: nargs.remove("nyquist") # scale if self.scale is None or "scale" in args: if self.nyquist: self.scale = False else: self.scale = False if "scale" in args: nargs.remove("scale") # supersample if self.supersample is None or "supersample" in args: self.supersample = 4.0 if "supersample" in args: nargs.remove("supersample") # Override defaults from PeakExtraction if self.cres is None or "cres" in args: if self.qmax > 0: self.cres = np.pi / self.qmax if "cres" in args: nargs.remove("cres") if self.pf is None or "pf" in args: from diffpy.srmise.peaks.gaussianoverr import GaussianOverR self.pf = [GaussianOverR(0.7)] if "pf" in args: nargs.remove("pf") # Enable "dg" as alias for "effective_dy" if "dg" in args and "effective_dy" not in args: nargs.add("effective_dy") # Set other defaults PeakExtraction.defaultvars(self, *nargs)
[docs] def resampledata(self, dr, **kwds): """Return (x, y, error in x, effective error in y) resampled by interval dr. Uses values of self.x, self.y, self.dx, self.effective_dy. The range is constrained by self.rng. The effective error may be scaled if class member scale is True. The method for 'resampling' the uncertainties is interpolation, since insufficient information exists in a PDFPeakExtraction object to propogate them correctly on the new grid. Parameters ---------- dr : float The sampling interval for resampling the data. **kwds : dict, optional Additional keyword arguments. - eps : float, default=1e-6 Suppresses the information lost warning when the difference between `dr` and the Nyquist interval `dr_nyquist` is less than `eps`. Returns ------- tuple of ndarray A tuple containing the resampled (x, y, error in x, effective error in y).""" self.defaultvars() # Find correct range if necessary. eps = kwds.get("eps", 10**-6) if self.qmax == 0: logger.warning("Resampling when qmax=0. Information may be lost!") else: dr_nyquist = np.pi / self.qmax # Not a robust epsilon test, but all physical Nyquist rates in same oom. if dr - dr_nyquist > eps: logger.warning( "Resampling at %s, below Nyquist rate of %s. Information will be lost!" % (dr, dr_nyquist) ) r = np.arange(max(self.x[0], self.rng[0]), min(self.x[-1], self.rng[1]), dr) y = resample(self.x, self.y, r) # TODO: Use a justified way to "resample" the uncertainties. # Even semi-justified would improve this ugly hack. if self.dx is None: r_error = None else: r_error = np.interp(r, self.x, self.dx) y_error = self.errorscale(dr) * np.interp(r, self.x, self.effective_dy) return (r, y, r_error, y_error)
[docs] def errorscale(self, dr): """Return proper scale of uncertainties. Always returns 1 unless qmax > 0, Nyquist sampling is enabled, and scale is True. Parameters ---------- dr: float The sampling interval Returns ------- float The uncertainties scaled.""" if self.qmax > 0 and self.nyquist and self.scale: dr_nyquist = np.pi / self.qmax return np.max([np.sqrt(dr_nyquist / dr), 1.0]) else: return 1.0
[docs] def extract(self, **kwds): """Extract peaks from the PDF. Returns ModelCovariance instance summarizing results. Parameters ---------- **kwds : dict Additional keyword arguments that might influence the extraction process. These could include parameters like `qmax`, `supersample`, `nyquist`, etc., which affect resampling and model refinement strategies. Returns ------- ModelCovariance An instance of ModelCovariance summarizing the covariance of the extracted model parameters. """ # TODO: The sanest way forward is to create a PeakExtraction object that does # the calculations for resampled data. All the relevant extraction variables # can be carefully controlled this way as well. Furthermore, it continues to # expose extract_single without change. (Could also implement a keyword system # here to control certain values.) # TODO: Add extraction variables for how I control resampling. Add these to # newvar, clean, defaultvar, etc. In most cases the default are just fine. self.clearcalc() tracer = srmiselog.tracer tracer.pushc() # Make sure all required extraction variables have some value self.defaultvars() # Determine grid spacing dr_raw = (self.x[-1] - self.x[0]) / (len(self.x) - 1) logger.info("Extract using qmax=%s", self.qmax) if self.qmax > 0: dr_nyquist = np.pi / self.qmax if dr_raw > dr_nyquist: # Technically I should yell for dr_raw >= dr_nyquist, since information # loss may occur at equality. logger.warning( "The input PDF appears to be missing information: The " "sampling interval of the input PDF (%s) is larger than " "the Nyquist interval (%s) defined by qmax=%s. This information " "is irretrievable." % (dr_raw, dr_nyquist, self.qmax) ) else: # Do I actually need this? dr_nyquist = dr_raw # not actually Nyquist sampling, natch # Define grids rngslice = self.getrangeslice() if self.qmax == 0: r1 = self.x[rngslice] y1 = self.y[rngslice] y_error1 = self.effective_dy[rngslice] else: if dr_nyquist / dr_raw < self.supersample: # supersample PDF for initial extraction dr = dr_nyquist / self.supersample (r1, y1, r_error1, y_error1) = self.resampledata(dr) else: # provided PDF already sufficiently oversampled r1 = self.x[rngslice] y1 = self.y[rngslice] y_error1 = self.errorscale(dr_raw) * self.effective_dy[rngslice] # Set up initial extraction pe = PeakExtraction() pe.setdata(r1, y1, None, None) msg = [ "Performing initial peak extraction", "----------------------------------", ] logger.info("\n".join(msg)) pe.setvars( cres=self.cres, pf=self.pf, effective_dy=y_error1, baseline=self.baseline, error_method=self.error_method, initial_peaks=self.initial_peaks, ) # Initial extraction pe.extract_single() ext = pe.extracted bl = pe.extracted.baseline # Prune model with termination ripples if self.qmax > 0: msg = [ "Model after initial extraction.", "%s", "\n", "-----------------------------", "Adding termination ripples.", ] logger.info("\n".join(msg), ext) from diffpy.srmise.peaks.terminationripples import TerminationRipples owners = list(set([p.owner() for p in ext.model])) tfuncs = {} for o in owners: tfuncs[o] = TerminationRipples(o, self.qmax) for p in ext.model: try: # TODO: Make this check more robust, or incorporate similar # functionality into the design of peakfunctionbase. # It is easy to imagine wrapping a peak function # several times to achieve different effects, but it # isn't necessarily clear when any given wrapping # is inadmissible. if not isinstance(p.owner(), TerminationRipples): p.changeowner(tfuncs[p.owner()]) except SrMiseStaticOwnerError: pass # Use Nyquist sampled data if desired if self.nyquist: logger.info("Applying Nyquist sampling.") # Models with more free parameters than data points cannot be fit # with chi-squared methods, so sometimes the existing model cannot # be pruned if the data are immediately resampled at the Nyquist # rate. In that case, keep resampling/pruning until no more parameters # can be removed or until able to resample at the Nyquist rate. # This either gives the correct statistics, or uses the least amount # of supersampling that can be mustered. The latter case usually # indicates that effective_dy is too small. while True: totalfreepars = ext.npars(count_fixed=False) if totalfreepars >= (r1[-1] - r1[0]) / dr_nyquist: dr_resample = 0.9999 * (r1[-1] - r1[0]) / totalfreepars else: dr_resample = dr_nyquist logger.info( "Data resampled at dr=%s. (Nyquist rate=%s)", dr_resample, dr_nyquist, ) (r2, y2, r_error2, y_error2) = self.resampledata(dr_resample) ext = ModelCluster( ext.model, bl, r2, y2, y_error2, None, self.error_method, self.pf, ) ext.fit() # Clean up parameters after resampling. ext.prune() if dr_resample == dr_nyquist: logger.info("Nyquist sampling complete.") break elif ext.npars(count_fixed=False) == totalfreepars: msg = [ "Warning: Nyquist sampling could not be completed, too many free parameters.", "The data have been oversampled by the least possible amount, but their ", "uncertainties are correlated and the model is probably overfit.", "Data sampled at dr=%s", "Nyquist rate=%s", "Degree of oversampling: %s", ] logger.warning( "\n".join(msg), dr_resample, dr_nyquist, dr_nyquist / dr_resample, ) break else: ext = ModelCluster(ext.model, bl, r1, y1, y_error1, None, self.error_method, self.pf) ext.prune() logger.info("Model after resampling and termination ripples:\n%s", ext) # Fit model with baseline, report covariance matrix cov = ModelCovariance() ext.fit(fitbaseline=True, cov=cov, cov_format="default_output") tracer.emit(ext) logger.info("Model after fitting with baseline:") try: logger.info(str(cov)) # logger.info("Correlations > .8:\n%s", "\n".join(str(c) for c in cov.correlationwarning(.8))) except SrMiseUndefinedCovarianceError: logger.warning("Covariance not defined for final model. Fit may not have converged.") logger.info(str(ext)) # Update calculated instance variables self.extraction_type = "extract" self.extracted = ext tracer.popc() return cov
[docs] def fit(self, **kwds): """Fit peaks in the PDF. Returns ModelCovariance instance summarizing results. Parameters ---------- **kwds : dict Keyword arguments passed to ModelCovariance instance. Different keywords could have different strategies to fit. See ModelCovariance class for more information. Returns ------- ModelCovariance instance The fitted ModelCovariance instance. """ self.clearcalc() # Make sure all required extraction variables have some value self.defaultvars() # Determine grid spacing dr_raw = (self.x[-1] - self.x[0]) / (len(self.x) - 1) logger.info("Fit using qmax=%s", self.qmax) if self.qmax > 0: dr_nyquist = np.pi / self.qmax if dr_raw > dr_nyquist: # Technically I should yell for dr_raw >= dr_nyquist, since information # loss may occur at equality. logger.warning( "The input PDF appears to be missing information: The " "sampling interval of the input PDF (%s) is larger than " "the Nyquist interval (%s) defined by qmax=%s. This information " "is irretrievable." % (dr_raw, dr_nyquist, self.qmax) ) else: # Do I actually need this? dr_nyquist = dr_raw # not actually Nyquist sampling, natch # Define grids rngslice = self.getrangeslice() if self.qmax == 0 or not self.nyquist: r1 = self.x[rngslice] y1 = self.y[rngslice] y_error1 = self.effective_dy[rngslice] else: (r1, y1, r_error1, y_error1) = self.resampledata(dr_nyquist) # Set up for fit_single pe = PeakExtraction() pe.setdata(r1, y1, None, None) msg = ["Performing peak fitting", "-----------------------"] logger.info("\n".join(msg)) pe.setvars( cres=self.cres, pf=self.pf, effective_dy=y_error1, baseline=self.baseline, error_method=self.error_method, initial_peaks=self.initial_peaks, ) # Perform fit cov = pe.fit_single() ext = pe.extracted logger.info("Model after fitting with baseline:") logger.info(str(ext)) try: logger.info(str(cov)) except SrMiseUndefinedCovarianceError: logger.warning("Covariance not defined for final model. Fit may not have converged.") # Update calculated instance variables self.extraction_type = "fit" self.extracted = ext return cov
[docs] def writemetadata(self): """Return string representation of peak extraction from PDF.""" lines = [] lines.append("filename=%s" % repr(self.filename)) lines.append("nyquist=%s" % repr(self.nyquist)) lines.append("qmax=%s" % repr(self.qmax)) lines.append("qmax_reportedbypdf=%s" % repr(self.qmax_reportedbypdf)) lines.append("qmax_fromdata=%s" % repr(self.qmax_fromdata)) lines.append("scale=%s" % repr(self.scale)) lines.append("supersample=%s" % repr(self.supersample)) datastring = "\n".join(lines) + "\n" return datastring
[docs] def readmetadata(self, metastr): """Read metadata from string. Parameters ---------- metastr : str Metadata string to read. Returns ------- None """ # filename res = re.search(r"^filename=(.*)$", metastr, re.M) if res: self.filename = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'filename'" raise SrMiseDataFormatError(emsg) # nyquist res = re.search(r"^nyquist=(.*)$", metastr, re.M) if res: self.nyquist = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'nyquist'" raise SrMiseDataFormatError(emsg) # qmax res = re.search(r"^qmax=(.*)$", metastr, re.M) if res: self.qmax = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'qmax'" raise SrMiseDataFormatError(emsg) # qmax_reportedbypdf res = re.search(r"^qmax_reportedbypdf=(.*)$", metastr, re.M) if res: self.qmax_reportedbypdf = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'qmax_reportedbypdf'" raise SrMiseDataFormatError(emsg) # qmax_fromdata res = re.search(r"^qmax_fromdata=(.*)$", metastr, re.M) if res: self.qmax_fromdata = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'qmax_fromdata'" raise SrMiseDataFormatError(emsg) # scale res = re.search(r"^scale=(.*)$", metastr, re.M) if res: self.scale = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'scale'" raise SrMiseDataFormatError(emsg) # supersample res = re.search(r"^supersample=(.*)$", metastr, re.M) if res: self.supersample = eval(res.groups()[0].strip()) else: emsg = "metastr does not match required field 'supersample'" raise SrMiseDataFormatError(emsg)
[docs] def writepwa(self, filename, comments="n/a"): """Write string summarizing extracted peaks to file. Parameters ---------- filename : str The name of the file to write comments : str The comments to write Returns ------- None """ bytes = self.writepwastr(comments) f = open(filename, "w") f.write(bytes) f.close() return
[docs] def writepwastr(self, comments): """Return string of extracted peaks (position, width, area) in PDF. There is not enough information to recreate the extracted peaks from this file. Parameters ---------- comments : str The string added to header containing notes about the output. Returns ------- None """ if self.extracted is None: emsg = "Cannot write summary: Peak Extraction has not been performed." raise SrMiseError(emsg) import time from getpass import getuser from diffpy.srmise import __version__ from diffpy.srmise.basefunction import BaseFunction lines = [] # Header lines.extend( [ "Summary written: " + time.ctime(), "produced by " + getuser(), "diffpy.srmise version %s" % __version__, "##### User comments", ] ) tcomments = [] for c in comments.splitlines(): tcomments.append("# " + c) lines.extend(tcomments) lines.extend( [ "##### PDF Peak Extraction Summary", "# The information below is not sufficient to replicate extraction.", ] ) # PDF peak extraction metadata lines.append("## PDF metadata") lines.append("filename=%s" % self.filename) lines.append("nyquist=%s" % repr(self.nyquist)) lines.append("qmax=%s" % repr(self.qmax)) lines.append("qmax_reportedbypdf=%s" % repr(self.qmax_reportedbypdf)) lines.append("qmax_fromdata=%s" % repr(self.qmax_fromdata)) lines.append("scale=%s" % repr(self.scale)) lines.append("supersample=%s" % repr(self.supersample)) lines.append("") lines.append("## Peak extraction metadata") # Extraction range lines.append("Range=%s" % repr(self.rng)) # Clustering resolution lines.append("cres=%g" % self.cres) # Average effective dy lines.append("effective_dy=%s (mean)" % np.mean(self.effective_dy)) # Passing the entire thing is something we're avoiding in the summary. lines.append("") # Initial peaks # I'm not sure what I want for this, but probably nothing. # lines.append('## Initial Peaks') # lines.append('') lines.append("## Model Quality") # Quality of fit lines.append("# Quality reported by ModelEvaluator: %s" % self.extracted.quality().stat) lines.append("# Free parameters in extracted peaks: %s" % self.extracted.model.npars(count_fixed=False)) if self.baseline is not None: fblpars = self.baseline.npars(count_fixed=False) else: fblpars = 0 lines.append("# Free parameters in baseline: %s" % fblpars) lines.append("# Length of data in final fit: %s" % len(self.extracted.r_data)) # Model evaluator if self.error_method is None: lines.append("ModelEvaluator=None") else: lines.append("ModelEvaluator=%s" % self.error_method.__name__) lines.append("") # Generate list of PeakFunctions and BaselineFunctions # so I can refer to them by index. allpf = [] allbf = [] if self.pf is not None: allpf.extend(self.pf) if self.initial_peaks is not None: allpf.extend([i.owner() for i in self.initial_peaks]) if self.baseline is not None: if isinstance(self.baseline, BaseFunction): allbf.append(self.baseline) else: # should be a ModelPart allbf.append(self.baseline.owner()) if self.extracted is not None: allpf.extend(self.extracted.peak_funcs) allpf.extend([p.owner() for p in self.extracted.model]) if self.extracted.baseline is not None: allbf.append(self.extracted.baseline.owner()) allpf = list(set(allpf)) allbf = list(set(allbf)) safepf = BaseFunction.safefunctionlist(allpf) safebf = BaseFunction.safefunctionlist(allbf) # Baseline Functions lines.append("## Baseline Functions") lines.append("# Index Type") for i, bf in enumerate(safebf): if bf.base is not None: base = "(base=%s)" % safebf.index(bf.base) else: base = "" lines.append("%s %s %s" % (i, bf.getmodule(), base)) lines.append("") # Baseline lines.append("## Baseline") lines.append("# Parameters of baseline, followed by comment which") lines.append("# gives the index of corresponding Baseline Function.") if self.extracted.baseline is None: lines.append("None") else: # Generate parameter labels from the baseline function's parameterdict blf = self.extracted.baseline.owner() if blf.npars > 0: parlbl = blf.parameterdict.keys() paridx = np.array(blf.parameterdict.values()).argsort() lines.append("# " + " ".join([str(parlbl[i]) for i in paridx])) blpars = " ".join([str(p) for p in self.extracted.baseline.pars]) else: blpars = "(no parameters)" blidx = safebf.index(blf) lines.append("%s # %s" % (blpars, blidx)) lines.append("") # Peak Functions lines.append("## Peak Functions") lines.append("# Index Type") for i, pf in enumerate(safepf): if pf.base is not None: base = "(base=%s)" % safepf.index(pf.base) else: base = "" lines.append("%s %s %s" % (i, pf.getmodule(), base)) lines.append("") # PWA lines.append("## Extracted Peaks") lines.append("# Parameters are given in the natural units of the data,") lines.append("# where width is measured as full-width at half maximum.") lines.append("# Each line is followd by a comment which gives the index") lines.append("# of the corresponding Peak Function.") lines.append("#L position fwhm area") for p in self.extracted.model: pf = p.owner() pwa = pf.transform_parameters(p, in_format="internal", out_format="pwa") pidx = pf.parameterdict["position"] widx = pf.parameterdict["width"] aidx = pf.parameterdict["area"] pfidx = safepf.index(pf) lines.append("%s %s %s # %s" % (pwa[pidx], pwa[widx], pwa[aidx], pfidx)) datastring = "\n".join(lines) + "\n" return datastring
# end PDFPeakExtraction class
[docs] def resample(orig_r, orig_y, new_r): """Resample sequence with Whittaker-Shannon interpolation formula. Parameters ---------- orig_r : array-like The r grid of the original sample. orig_y : array-like The data to resample. new_r : array-like The resampled r grid. Returns ------- new_y : array-like The sequence of same type as new_r with the resampled values. """ n = len(orig_r) dr = (orig_r[-1] - orig_r[0]) / (n - 1) if new_r[0] < orig_r[0]: logger.warning("Resampling outside original grid: %s (requested) < %s (original)" % (new_r[0], orig_r[0])) if new_r[-1] > orig_r[-1]: logger.warning( "Resampling outside original grid: %s (requested) > %s (original)" % (new_r[-1], orig_r[-1]) ) new_y = new_r * 0.0 shift = orig_r[0] / dr for i, y in enumerate(orig_y): new_y += y * np.sinc(new_r / dr - (i + shift)) return new_y
[docs] def find_qmax(r, y, showgraphs=False): """Determine approximate qmax from PDF. Parameters ---------- r : array-like The r values of the PDF. y : array-like The corresponding y values of the PDF. showgraphs : bool If True, the graphs are shown. Returns ------- tuple The qmax of the PDF and its corresponding uncertainties. """ if len(r) != len(y): emsg = "Argument arrays must have the same length." raise ValueError(emsg) dr = (r[-1] - r[0]) / (len(r) - 1) # Nyquist-sampled PDFs already have the minimum number of data points, so # they must be resampled so sudden changes in reciprocal space amplitudes # can be observed. new_r = np.linspace(r[0], r[-1], 2 * len(r)) new_y = resample(r, y, new_r) new_dr = (new_r[-1] - r[0]) / (len(new_r) - 1) yfft = np.imag(np.fft.fft(new_y))[: len(new_y) / 2] d_ratio = stdratio(yfft) # Negative of the 2nd (forward) difference of d_ratio. There should be # a noticeable jump in the ratio at qmax. Before and after qmax the ratio # smoothly rises (roughly linearly or as a second-order polynomial). If # there is a large amount of extra data past qmax the ratio will fall # smoothly as d_left start to include points past qmax. Taking the 2nd # difference removes this smoothly varying part, leaving relatively sharp # peaks near the jump. The jump in d_ratio (even if obscured) appears as # a negative peak in the 2nd derivative, hence multiplication by -1. dder = -np.diff(d_ratio, 2) # The index of yfft which leads to the jump in d_ratio. (Search dder in # reverse since we probably want the last maximum in it, in the unlikely case # it appears more than once, while argmax returns the first.) m_idx = len(dder) - np.argmax(dder[::-1]) + 1 dq = 2 * np.pi / ((len(new_y) - 1) * new_dr) qmax = dq * m_idx # Calculate dq again for original grid dq = 2 * np.pi / ((len(y) - 1) * dr) if showgraphs: import matplotlib.pyplot as plt v1 = np.max([m_idx - int(np.sqrt(m_idx)), 2]) v2 = np.min([m_idx + int(np.sqrt(len(yfft) - 1 - m_idx)), len(d_ratio)]) plt.ion() # DFT of the PDF plt.figure() plt.subplot(211) # near obtained qmax plt.plot(dq * np.arange(v1, v2), yfft[v1:v2]) plt.axvline(x=qmax, color="r") plt.suptitle("qmax: %s (dq=%s)" % (qmax, dq)) plt.subplot(212) # over whole range plt.plot(dq * np.arange(len(yfft)), yfft) plt.axvline(x=qmax, color="r") plt.show() plt.ioff() input() return (qmax, dq)
[docs] def stdratio(data): """Calculate ratio of standard deviation for runs of equal length in data. Uses a numerically-stable online algorithm for calculating the standard deviation. Parameters ---------- data : array-like The sequence of data values Returns ------- array-like an array of length floor(len(data)/2)-1. The ith element is equivalent to std(data[:i+2])/std(data[i+2:2i+4]).""" limit = int(np.floor(len(data) / 2)) std_left = np.zeros(limit) std_right = np.zeros(limit) n = 0 mean = 0.0 m2 = 0.0 # Update std_left. This is the usual online algorithm. for i in range(limit): n = n + 1 delta = data[i] - mean mean = mean + delta / n m2 = m2 + delta * (data[i] - mean) std_left[i] = m2 n = 2 mean = (data[2] + data[3]) / 2 m2 = (data[2] - mean) ** 2 + (data[3] - mean) ** 2 std_right[0] = 0.0 std_right[1] = m2 # Update std_right. Remove one from left side, add two on right. for i in range(2, limit): # Remove one from left n = n - 1 delta = data[i] - mean mean = mean - delta / n m2 = m2 - delta * (data[i] - mean) # Add two to right n = n + 2 mean_add = (data[2 * i] + data[2 * i + 1]) / 2 m2_add = (data[2 * i] - mean_add) ** 2 + (data[2 * i + 1] - mean_add) ** 2 delta = mean_add - mean mean = mean + (2 * delta) / n m2 = m2 + m2_add + (delta**2) * (2 * n - 4) / n std_right[i] = m2 return np.sqrt(std_left[1:] / std_right[1:])
# Redirect to main extraction script if __name__ == "__main__": from diffpy.srmise.applications import extract extract.main()