#!/usr/bin/env python
##############################################################################
#
# SrMise by Luke Granlund
# (c) 2014 trustees of the Michigan State University
# (c) 2024 trustees of Columia University in the City of New York
# All rights reserved.
#
# File coded by: Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################
import matplotlib.pyplot as plt
import numpy as np
from diffpy.srmise.modelcluster import ModelCluster
from diffpy.srmise.pdfpeakextraction import PDFPeakExtraction
# This is a total hack-job right now, and isn't suitable for
# general use. Limitations include:
# 1) Only works with PDFPeakExtraction, not PeakExtraction
# 2) Only constant uncertainties are supported
# 3) Not using srmiselog
# 4) Really ugly kluge to allow FromSequence to pickle.
[docs]
class PeakStability:
"""Utility to test robustness of peaks.
results: [error scalar, model, bl, dr]
ppe: a PDFPeakExtraction instance"""
def __init__(self):
self.results = []
self.ppe = None
self.current = None
[docs]
def setppe(self, ppe):
self.ppe = ppe
[docs]
def load(self, filename):
try:
import cPickle as pickle
except ImportError:
import pickle
in_s = open(filename, "rb")
try:
(self.results, ppestr) = pickle.load(in_s)
self.ppe = PDFPeakExtraction()
self.ppe.readstr(ppestr)
# Ugly kluge for the baseline, since FromSequence
# can't pickle.
for r in self.results:
bl = self.ppe.baseline
kwds = r[2]
if r[2] is not None:
kwds = r[2]
if hasattr(bl, "estimate_parameters"):
r[2] = bl.actualize(default_input="internal", **kwds)
else:
r[2] = bl.owner().actualize(in_format="internal", **kwds)
finally:
in_s.close()
self.setcurrent(0)
[docs]
def save(self, filename):
try:
import cPickle as pickle
except ImportError:
import pickle
out_s = open(filename, "wb")
try:
# Write to the stream
outstr = self.ppe.writestr()
# ugly kluge to let FromSequence pickle
# (it stores xyrepr() in metadict)
results2 = []
for r in self.results:
if r[2] is None:
bldict = None
else:
bldict = {
"pars": r[2].pars,
"free": r[2].free,
"removable": r[2].removable,
"static_owner": r[2].static_owner,
}
results2.append([r[0], r[1], bldict, r[3]])
pickle.dump([results2, outstr], out_s)
finally:
out_s.close()
[docs]
def plotseries(self, style="o", **kwds):
plt.figure()
plt.ioff()
for e, r, bl, dr in self.results:
peakpos = [p["position"] for p in r]
es = [e] * len(peakpos)
plt.plot(peakpos, es, style, **kwds)
plt.ion()
plt.draw()
[docs]
def plot(self, **kwds):
"""Plot the current model. Keywords passed to pyplot.plot()"""
plt.clf()
plt.plot(*self.ppe.extracted.plottable(), **kwds)
q = self.ppe.extracted.quality()
plt.suptitle(
"[%i/%i]\n"
"Uncertainty: %6.3f. Peaks: %i.\n"
"Quality: %6.3f. Chi-square: %6.3f"
% (
self.current + 1,
len(self.results),
self.ppe.effective_dy[0],
len(self.ppe.extracted.model),
q.stat,
q.chisq,
)
)
[docs]
def setcurrent(self, idx):
"""Make the idxth model the active one.
Parameters
----------
idx : int
The index of the model to be tested.
Returns
-------
None
"""
self.current = idx
if idx is not None:
result = self.results[idx]
self.ppe.setvars(quiet=True, effective_dy=result[0] * np.ones(len(self.ppe.x)))
(r, y, dr, dy) = self.ppe.resampledata(result[3])
self.ppe.extracted = ModelCluster(
result[1], result[2], r, y, dy, None, self.ppe.error_method, self.ppe.pf
)
else:
self.ppe.clearcalc()
[docs]
def animate(self, results=None, step=False, **kwds):
"""Show animation of extracted peaks from first to last.
Keywords passed to pyplot.plot()
Parameters
----------
step : bool
Require keypress to show next plot
results array-like
The indices of results to show
"""
if results is None:
results = range(len(self.results))
oldcurrent = self.current
self.setcurrent(0)
plt.ion()
plt.plot(*self.ppe.extracted.plottable())
plt.axis()
for i in results:
self.setcurrent(i)
plt.ioff()
self.plot(**kwds)
plt.ion()
plt.draw()
if step:
input()
self.setcurrent(oldcurrent)
[docs]
def run(self, err, savecovs=False):
"""Running the uncertainty for the results.
Parameters
----------
err : array-like
The sequence of uncertainties to run at.
savecovs : bool
boolean to determine to save covariance matrix. Default is False.
If savecovs is True, return the covariance matrix for each final fit."""
self.results = []
covs = []
for i, e in enumerate(err):
print("---- Running for uncertainty %s (%i/%i) ----" % (e, i, len(err)))
self.ppe.clearcalc()
self.ppe.setvars(effective_dy=e)
if savecovs:
covs.append(self.ppe.extract())
else:
self.ppe.extract()
dr = (self.ppe.extracted.r_cluster[-1] - self.ppe.extracted.r_cluster[0]) / (
len(self.ppe.extracted.r_cluster) - 1
)
self.results.append([e, self.ppe.extracted.model, self.ppe.extracted.baseline, dr])
for e, r, bl, dr in self.results:
print("---- Results for uncertainty %s ----" % e)
print(r)
return covs