Source code for diffpy.srmise.dataclusters

#!/usr/bin/env python
# SrMise            by Luke Granlund
#                   (c) 2014 trustees of the Michigan State University
#                   (c) 2024 trustees of Columbia University in the City of New York
#                   All rights reserved.
# File coded by:    Luke Granlund
# See LICENSE.txt for license information.
"""Defines class to partition sequences representing the x and y axis into peak-like clusters."""

import logging

import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger("diffpy.srmise")

[docs] class DataClusters: """Find clusters corresponding to peaks in the PDF (y-array) DataClusters determines which points in inter-atomic distane, r, correspond to peaks in the PDF. The division between clusters is contiguous, with borders between clusters likely near relative minima in the data. Clusters are iteratively formed around points with the largest PDF values. New clusters are added only when the unclustered data point under consideration is greater than a given distance (the 'resolution') from the nearest existing cluster. Data members ------------ x : array The array of r values. y : sequence of y values The array of PDF values, G(r) res : float The clustering resolution, i.e., the number of distance another point has to be away from the center of an existing cluster to before a new cluster is formed. A value of zero allows every point to be a cluster. data_order : array The array of x, y indices ordered by decreasing y clusters : ndarray The array of cluster ranges current_idx : int The index of data_order currently considered """ def __init__(self, x, y, res): """Constructor Parameters ---------- x : array The array of r values. y : sequence of y values The array of PDF values, G(r) res : float The clustering resolution, i.e., the distance another point has to be away from the center of an existing cluster to before a new cluster is formed. A value of zero allows every point to be cluster. """ # Track internal state of clustering. self.INIT = 0 self.READY = 1 self.CLUSTERING = 2 self.DONE = 3 self._clear() self._setdata(x, y, res) return # This iterator operates not over found clusters, but over the process of # clustering. This behavior could cause confusion and should perhaps be # altered. def __iter__(self): return self def __eq__(self, other): if not isinstance(other, DataClusters): return False return ( np.array_equal(self.x, other.x) and np.array_equal(self.y, other.y) and np.array_equal(self.data_order, other.data_order) and np.array_equal(self.clusters, other.clusters) and self.res == other.res and self.current_idx == other.current_idx and self.lastcluster_idx == other.lastcluster_idx and self.lastpoint_idx == other.lastpoint_idx and self.status == other.status and self.INIT == other.INIT and self.READY == other.READY and self.CLUSTERING == other.CLUSTERING and self.DONE == other.DONE ) def _clear(self): """ Clear all data and reset the cluster object to a transient initial state. The purpose of this method is to provide a clean state before creating new clustering operations. The object is updated in-place and no new instance is returned. Returns ------- None """ self.x = np.array([]) self.y = np.array([]) self.data_order = np.array([]) self.clusters = np.array([[]]) self.res = 0 self.current_idx = 0 self.lastcluster_idx = None self.lastpoint_idx = None self.status = self.INIT return
[docs] def reset_clusters(self): """Reset all progress on clustering.""" self.clusters = np.array([[self.data_order[-1], self.data_order[-1]]]) self.current_idx = self.data_order.size - 1 self.lastcluster_idx = 0 self.lastpoint_idx = self.data_order[-1] if self.status != self.INIT: self.status = self.READY return
def _setdata(self, x, y, res): """Assign data members for x- and y-coordinates, and resolution. Parameters ---------- x : array The array of r values. y : sequence of y values The array of PDF values, G(r) res : float The clustering resolution, i.e., the distance another point has to be away from the center of an existing cluster to before a new cluster is formed. A value of zero allows every point to be cluster. """ if len(x) != len(y): raise ValueError("Sequences x and y must have the same length.") if res < 0: raise ValueError( "Value of resolution parameter is less than zero. Please rerun specifying a non-negative res" ) self.x = x self.y = y self.res = res if x.size == 0: self.data_order = np.array([]) self.clusters = np.array([[]]) self.current_idx = 0 self.lastpoint_idx = None self.status = self.INIT else: self.data_order = self.y.argsort() self.clusters = np.array([[self.data_order[-1], self.data_order[-1]]]) self.current_idx = len(self.data_order) - 1 self.lastpoint_idx = self.data_order[-1] self.status = self.READY self.lastcluster_idx = None return def __next__(self): """Cluster point with largest y-coordinate left, returning self. next() always adds at least one additional point to the existing cluster, or raises an exception if all points have been clustered. """ if self.status == self.INIT: raise Exception("Cannot cluster next point while status is INIT.") if self.status == self.READY: self.status = self.CLUSTERING elif self.status == self.DONE: raise StopIteration # Find next unclustered point, if one exists. nearest_cluster = [0, 0.0] while nearest_cluster[1] == 0.0 and self.current_idx > 0: self.current_idx += -1 test_idx = self.data_order[self.current_idx] nearest_cluster = self.find_nearest_cluster(test_idx) # Check status if self.current_idx <= 0: self.status = self.DONE if nearest_cluster[1] == 0.0: # Last point already clustered, so stop immediately. raise StopIteration self.lastpoint_idx = test_idx if np.abs(nearest_cluster[1]) <= self.res: # Add to an existing cluster self.lastcluster_idx = nearest_cluster[0] if test_idx < self.clusters[nearest_cluster[0], 0]: self.clusters[nearest_cluster[0], 0] = test_idx else: self.clusters[nearest_cluster[0], 1] = test_idx else: # Make a new cluster if nearest_cluster[1] < 0: # Insert left of nearest cluster self.lastcluster_idx = nearest_cluster[0] else: # insert right of nearest cluster self.lastcluster_idx = nearest_cluster[0] + 1 self.clusters = np.insert(self.clusters, int(self.lastcluster_idx), [test_idx, test_idx], 0) return self
[docs] def makeclusters(self): """Cluster all remaining data.""" for i in self: pass return
[docs] def find_nearest_cluster2(self, x): """Return [cluster index, distance] for cluster nearest to x. Parameters ---------- x : ndarray Coordinate of point of interest The distance is positive/negative if the point is right/left of the nearest cluster. If the point is within an existing cluster then distance = 0. Returns ------- array-like The index of the nearest cluster, and the distance for cluster nearest to x. None if no cluster """ if self.status == self.INIT: raise Exception("Cannot cluster next point while status is INIT.") idx = np.searchsorted(self.x, x) if idx > 0 or idx >= len(self.x): return self.find_nearest_cluster(idx) else: # Choose adjacent index nearest to x if (self.x[idx] - x) < (x - self.x[idx - 1]): return self.find_nearest_cluster(idx) else: return self.find_nearest_cluster(idx - 1)
[docs] def find_nearest_cluster(self, idx): """Return [cluster index, distance] for cluster nearest to x[idx]. The distance is positive/negative if the point is right/left of the nearest cluster. If the point is within an existing cluster then distance = 0. Parameters ---------- idx : array-like index of point in self.x of interest. Returns ------- array-like The array of cluster index and the distacne to the nearest cluster. None if no clusters exist. """ if self.status == self.INIT: raise Exception("Cannot cluster next point while status is INIT.") clusters_flat = self.clusters.flatten() if len(clusters_flat) == 0: return None flat_idx = clusters_flat.searchsorted(idx) near_idx = flat_idx / 2 if flat_idx == len(clusters_flat): # test_idx is right of the last cluster return [near_idx - 1, self.x[idx] - self.x[self.clusters[-1, 1]]] if clusters_flat[flat_idx] == idx or flat_idx % 2 == 1: # idx is within some cluster return [near_idx, 0.0] if flat_idx == 0: # idx is left of the first cluster return [near_idx, self.x[idx] - self.x[self.clusters[0, 0]]] # Calculate which of the two nearest clusters is closer distances = np.array( [ self.x[idx] - self.x[self.clusters[int(near_idx) - 1, 1]], self.x[idx] - self.x[self.clusters[int(near_idx), 0]], ] ) if distances[0] < np.abs(distances[1]): return [near_idx - 1, distances[0]] else: return [near_idx, distances[1]]
[docs] def cluster_is_full(self, cluster_idx): """Return whether the given cluster can grow. A cluster is full if no unclustered points remain adjacent to its boundaries. Parameters ---------- cluster_idx : array-like The index of the cluster to test Returns ------- bools True if the cluster is full, False otherwise """ if cluster_idx > 0: low = self.clusters[cluster_idx - 1, 1] + 1 else: low = 0 if cluster_idx < len(self.clusters) - 1: high = self.clusters[cluster_idx + 1, 0] - 1 else: high = len(self.data_order) - 1 return self.clusters[cluster_idx, 0] == low and self.clusters[cluster_idx, 1] == high
[docs] def combine_clusters(self, combine): """Combine clusters specified by each subarray of cluster indices. Clusters to combine must be contiguous, increasing, and have no unclustered points between them. Parameters ---------- combine : ndarray [[leftmost_idx1, ..., rightmost_idx1], ...] Returns ------- None """ # Ensure that the same clusters aren't combined multiple times. combine_flat = np.array(combine).ravel() if len(combine_flat) != len(np.unique(combine_flat)): raise ValueError("Cannot combine a single cluster multiple times.") for c in combine: # Test that all clusters are contiguous and adjacent first = c[0] for i in range(c[0], c[-1]): if c[i + 1 - first] - 1 != c[i - first]: raise ValueError( "".join( [ "Clusters ", str(c[i]), " and ", str(c[i + 1]), " are not contiguous and/or increasing.", ] ) ) if self.clusters[i + 1, 0] - self.clusters[i, 1] != 1: raise ValueError( "".join( [ "Clusters ", str(c[i]), " and ", str(c[i + 1]), " have unclustered points between them.", ] ) ) # update cluster endpoints self.clusters[c[0], 1] = self.clusters[c[-1], 1] todelete = np.array([c[1:] for c in combine]).ravel() self.clusters = np.delete(self.clusters, todelete, 0)
[docs] def find_adjacent_clusters(self): """Return all cluster indices with no unclustered points between them. Return array([[leftmost_idx1,...,rightmost_idx1],...]) such that there are no unclustered points between each element in subarray of clusters (inclusive). If no such clusters exist, returns an empty array. """ if self.status == self.INIT: raise Exception("Cannot cluster next point while status is INIT.") adj = [] left_idx = 0 while left_idx < len(self.clusters) - 1: while ( left_idx < len(self.clusters) - 1 and self.clusters[left_idx + 1, 0] - self.clusters[left_idx, 1] != 1 ): left_idx += 1 # Not left_idx+1 since left_idx=len(self.clusters)-2 even if no # clusters are actually adjacent. right_idx = left_idx while ( right_idx < len(self.clusters) - 1 and self.clusters[right_idx + 1, 0] - self.clusters[right_idx, 1] == 1 ): right_idx += 1 if right_idx > left_idx: adj.append(range(left_idx, right_idx + 1)) left_idx = right_idx + 1 # set for next possible left_idx return np.array(adj)
[docs] def cut(self, idx): """Return slice(s) for data given cluster index (or indices). Parameters idx - Cluster index (or sequence of indices). """ data_ids = self.clusters[idx] if len(data_ids) == data_ids.size: # idx is a scalar, so give single slice object return slice(data_ids[0], data_ids[1] + 1) else: # idx is a list/slice, so give list of slice objects return [slice(c[0], c[1] + 1) for c in data_ids]
[docs] def cluster_boundaries(self): """Return sequence with (x,y) of all cluster boundaries.""" boundaries = [] for cluster in self.clusters: xlo = np.mean(self.x[cluster[0] - 1 : cluster[0] + 1]) ylo = np.mean(self.y[cluster[0] - 1 : cluster[0] + 1]) xhi = np.mean(self.x[cluster[1] : cluster[1] + 2]) yhi = np.mean(self.y[cluster[1] : cluster[1] + 2]) boundaries.append((xlo, ylo)) boundaries.append((xhi, yhi)) return np.unique(boundaries)
[docs] def plot(self, *args, **kwds): """Plot the data with vertical lines at the cluster divisions. args and kwds passed to matplotlib.plot() """ plt.figure() ax = plt.subplot(111) plt.ion() plt.plot(self.x, self.y, *args, **kwds) plt.ioff() boundaries = self.cluster_boundaries() (ymin, ymax) = ax.get_ylim() for b in boundaries: plt.axvline(b[0], 0, (b[1] - ymin) / (ymax - ymin), color="k") plt.ion() ax.figure.canvas.draw() return
[docs] def animate(self): """Animate clustering. Restores state when complete.""" clusters = self.clusters current_idx = self.current_idx lastcluster_idx = self.lastcluster_idx lastpoint_idx = self.lastpoint_idx status = self.status self.reset_clusters() fig, ax = plt.subplots() canvas = fig.canvas background = canvas.copy_from_bbox(ax.bbox) ymin, ymax = ax.get_ylim() all_lines = [] for i in self: canvas.restore_region(background) boundaries = self.cluster_boundaries() for i, b in enumerate(boundaries): height = (b[1] - ymin) / (ymax - ymin) if i < len(all_lines): all_lines[i].set_xdata([b[0], b[0]]) all_lines[i].set_ydata([0, height]) ax.draw_artist(all_lines[i]) else: line = plt.axvline(b[0], 0, height, color="k", animated=True) ax.draw_artist(line) all_lines.append(line) canvas.blit(ax.bbox) self.clusters = clusters self.current_idx = current_idx self.lastcluster_idx = lastcluster_idx self.lastpoint_idx = lastpoint_idx self.status = status return
# End of class DataClusters # simple test code if __name__ == "__main__": x = np.array([-2.0, -1.5, -1.0, -0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]) y = np.array( [ 0.0183156, 0.105399, 0.36788, 0.778806, 1.00012, 0.780731, 0.386195, 0.210798, 0.386195, 0.780731, 1.00012, 0.778806, 0.36788, 0.105399, 0.0183156, ] ) testcluster = DataClusters(x, y, 0.1) testcluster.makeclusters() print(testcluster.clusters) adj = testcluster.find_adjacent_clusters() print(adj) if len(adj) > 0: testcluster.combine_clusters(adj) print(testcluster.clusters)