Source code for diffpy.srmise.srmiselog

#!/usr/bin/env python
##############################################################################
#
# SrMise            by Luke Granlund
#                   (c) 2014 trustees of the Michigan State University
#                   (c) 2024 trustees of Columia University in the City of New York
#                   All rights reserved.
#
# File coded by:    Luke Granlund
#
# See LICENSE.txt for license information.
#
##############################################################################
"""Controls the logging and plotting options of diffpy.srmise.

By default messages are logged to stdout, but logging to a file is also supported.
SrMiseLog defines five levels of message importance, and all messages at least
as important as the set level are displayed and/or written to the appropriate file.
Levels are specified as non-negative integers or equivalent strings, and are identical
to those found in the Python logging package.

'debug' -> 10
'info' -> 20 (default)
'warning' -> 30
'error' -> 40
'critical' -> 50

Liveplotting plots the value of a model each time it is fit, showing results for
before and after fitting.  User input is optionally required to procede after
fitting.

Functions
---------
addfilelog: Send logging information to a file.
liveplotting: Set whether to use liveplotting, and whether to wait after each liveplot
setfilelevel: Set logging level for the file logger.
setlevel: Set logging level of default logger.
gettracer: Get a TracePeaks instance for tracing peak extraction.
"""

import logging
import os.path
import re

from diffpy.srmise.srmiseerrors import SrMiseDataFormatError, SrMiseFileError, SrMiseLogError

# Default settings ###
defaultformat = "%(message)s"
defaultlevel = logging.INFO

LEVELS = {
    "debug": logging.DEBUG,
    "info": logging.INFO,
    "warning": logging.WARNING,
    "error": logging.ERROR,
    "critical": logging.CRITICAL,
}

# Set up logging to stdout ###
logger = logging.getLogger("diffpy.srmise")
logger.setLevel(defaultlevel)
ch = logging.StreamHandler()
ch.setLevel(defaultlevel)

formatter = logging.Formatter(defaultformat)
ch.setFormatter(formatter)

logger.addHandler(ch)

# Optional file logger ###
fh = None

# Make updated plots as fitting progresses. ###
liveplots = False
wait = False


[docs] def addfilelog(filename, level=defaultlevel, format=defaultformat): """Log output from diffpy.srmise in specified file. Parameters filename: Name of file to receiving output level: The logging level format: A string defining format of output messages conforming to logging package. """ global fh fh = logging.FileHandler(filename) fh.setLevel(level) formatter = logging.Formatter(format) fh.setFormatter(formatter) logger.addHandler(fh)
[docs] def setfilelevel(level): """Set level of file logger. Parameters level: The logging level.""" global fh if fh is not None: level = LEVELS.get(level, level) fh.setLevel(level) if level < logger.getEffectiveLevel(): logger.setLevel(level) else: emsg = "File handler does not exist, cannot set its level." raise SrMiseLogError(emsg)
[docs] def setlevel(level): """Set level of default (stdout) logger. Parameters level: The logging level.""" global ch level = LEVELS.get(level, level) ch.setLevel(level) if level < logger.getEffectiveLevel(): logger.setLevel(level)
[docs] def liveplotting(lp, w=False): """Set whether or not to use live plotting. When using liveplotting, a plot will be shown and updated as extraction progresses. Parameters lp: Use live plotting (True) or not (False). w: (False) Whether to wait for user after plotting.""" global liveplots global wait if lp is True or lp is False: liveplots = lp else: emsg = "Parameter lp must be a boolean." raise ValueError(emsg) if w is True or w is False: wait = w else: emsg = "Parameter w must be a boolean." raise ValueError(emsg)
# TracePeaks. Primary purpose is to enable creating movies. ###
[docs] class TracePeaks(object): """Output trace information during peak extraction.""" def __init__(self, **kwds): self.__filter = None self.filter = kwds.get("filter", ["False"]) self.filebase = kwds.get("filebase", None) self.store = kwds.get("store", False) self.trace = [] self.counter = None self.recursion = None self.call = None self.reset_trace() if self.filebase: dir = os.path.dirname(self.filebase) if dir and not os.path.exists(dir): os.makedirs(dir) return
[docs] def emit(self, *args, **kwds): """Write current trace to file. Parameters Any number of ModelCluster instances""" if not eval(self.filter): return else: trace = self.maketrace(*args, **kwds) if self.filebase: self.write(trace) if self.store: self.trace.append(trace) self.counter += 1 return
[docs] def maketrace(self, *args, **kwds): """Return dictionary of trace properties. Keywords model - Use specified model (Peaks instance) instead of those in args. """ mc = args[0].copy() mc.slice = None mc.change_slice(slice(len(mc.r_data))) clusters = [] for m in args: clusters.append(m.slice) for m in args[1:]: mc.replacepeaks(m.model) return { "mc": mc, "clusters": clusters, "recursion": self.recursion, "counter": self.counter, }
[docs] def writestr(self, trace): """Return string representation of current trace.""" lines = [] lines.append("### Trace") lines.append("counter=%i" % trace["counter"]) lines.append("recursion=%i" % trace["recursion"]) lines.append("clusters=%s" % trace["clusters"]) lines.append("### ModelCluster") lines.append(trace["mc"].writestr()) return "\n".join(lines)
[docs] def write(self, trace): """Write current trace to file.""" filename = "%s_%i" % (self.filebase, trace["counter"]) f = open(filename, "w") bytes = self.writestr(trace) f.write(bytes) f.close()
[docs] def read(self, filename): """Read tracer ModelCluster from file. Parameters filename - file from which to read Returns dictionary with keys "clusters" - List of cluster regions [[r0,r1],[r2,r3],...] "counter" - The count when object was created "mc" - A ModelCluster instance "recursion" - The recursion level of mc""" try: return self.readstr(open(filename, "rb").read()) except SrMiseDataFormatError as err: logger.exception("") basename = os.path.basename(filename) emsg = ("Could not open '%s' due to unsupported file format " + "or corrupted data. [%s]") % ( basename, err, ) raise SrMiseFileError(emsg)
[docs] def readstr(self, datastring): """Read tracer ModelCluster from string. Parameters datastring - The string representation of a trace Returns dictionary with keys "clusters" - List of cluster regions [[r0,r1],[r2,r3],...] "counter" - The count when object was created "mc" - A ModelCluster instance "recursion" - The recursion level of mc""" # find where the ModelCluster section starts res = re.search(r"^#+ ModelCluster\s*(?:#.*\s+)*", datastring, re.M) if res: header = datastring[: res.start()] mc = datastring[res.end() :].strip() else: emsg = "Required section 'ModelCluster' not found." raise SrMiseDataFormatError(emsg) # instantiate ModelCluster if re.match(r"^None$", mc): mc = None else: from diffpy.srmise.modelcluster import ModelCluster mc = ModelCluster.factory(mc) res = re.search(r"^clusters=(.*)$", header, re.M) if res: clusters = eval(res.groups()[0].strip()) else: emsg = "Required field 'clusters' not found." raise SrMiseDataFormatError(emsg) res = re.search(r"^recursion=(.*)$", header, re.M) if res: eval(res.groups()[0].strip()) else: emsg = "Required field 'recursion' not found." raise SrMiseDataFormatError(emsg) res = re.search(r"^counter=(.*)$", header, re.M) if res: eval(res.groups()[0].strip()) else: emsg = "Required field 'counter' not found." raise SrMiseDataFormatError(emsg) return { "mc": mc, "clusters": clusters, "recursion": self.recursion, "counter": self.counter, }
[docs] def pushr(self): """Enter a layer of recursion, and return new level.""" self.recursion += 1 return self.recursion
[docs] def popr(self): """Exit a layer of recursion, and return new level.""" self.recursion -= 1 return self.recursion
[docs] def pushc(self): """Enter a new tracer-aware function.""" if self.call == 0: self.reset_trace() self.call += 1 return self.call
[docs] def popc(self): """Exit a tracer-aware function.""" self.call -= 1 return self.call
[docs] def reset_trace(self): self.call = 0 self.counter = 0 self.recursion = 0 self.stored = []
# filter property
[docs] def setfilter(self, filter): self.__filter = compile(" and ".join(["(%s)" % f for f in filter]), "<string>", "eval")
[docs] def getfilter(self): return self.__filter
filter = property(getfilter, setfilter)
# End of class TracePeaks
[docs] def settracer(**kwds): global tracer tracer = TracePeaks(**kwds) return tracer
# Default tracer never emits tracer = settracer()