import math
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from diffpy.utils.scattering_objects.diffraction_objects import Diffraction_object
RADIUS_MM = 1
N_POINTS_ON_DIAMETER = 300
TTH_GRID = np.arange(1, 180.1, 0.1)
CVE_METHODS = ["brute_force", "polynomial_interpolation"]
# pre-computed datasets for polynomial interpolation (fast calculation)
MUD_LIST = [0.5, 1, 2, 3, 4, 5, 6]
CWD = Path(__file__).parent.resolve()
MULS = np.loadtxt(CWD / "data" / "inverse_cve.xy")
COEFFICIENT_LIST = np.array(pd.read_csv(CWD / "data" / "coefficient_list.csv", header=None))
INTERPOLATION_FUNCTIONS = [interp1d(MUD_LIST, coefficients, kind="quadratic") for coefficients in COEFFICIENT_LIST]
[docs]
class Gridded_circle:
def __init__(self, radius=1, n_points_on_diameter=N_POINTS_ON_DIAMETER, mu=None):
self.radius = radius
self.npoints = n_points_on_diameter
self.mu = mu
self.distances = []
self.muls = []
self._get_grid_points()
def _get_grid_points(self):
"""
given a radius and a grid size, return a grid of points to uniformly sample that circle
"""
xs = np.linspace(-self.radius, self.radius, self.npoints)
ys = np.linspace(-self.radius, self.radius, self.npoints)
self.grid = {(x, y) for x in xs for y in ys if x**2 + y**2 <= self.radius**2}
self.total_points_in_grid = len(self.grid)
[docs]
def set_distances_at_angle(self, angle):
"""
given an angle, set the distances from the grid points to the entry and exit coordinates
Parameters
----------
angle float
the angle in degrees
Returns
-------
the list of distances containing total distance, primary distance and secondary distance
"""
self.primary_distances, self.secondary_distances, self.distances = [], [], []
for coord in self.grid:
distance, primary, secondary = self.get_path_length(coord, angle)
self.distances.append(distance)
self.primary_distances.append(primary)
self.secondary_distances.append(secondary)
[docs]
def set_muls_at_angle(self, angle):
"""
compute muls = exp(-mu*distance) for a given angle
Parameters
----------
angle float
the angle in degrees
Returns
-------
an array of floats containing the muls corresponding to each angle
"""
mu = self.mu
self.muls = []
if len(self.distances) == 0:
self.set_distances_at_angle(angle)
for distance in self.distances:
self.muls.append(np.exp(-mu * distance))
def _get_entry_exit_coordinates(self, coordinate, angle):
"""
get the coordinates where the beam enters and leaves the circle for a given angle and grid point
Parameters
----------
grid_point tuple of floats
the coordinates of the grid point
angle float
the angle in degrees
radius float
the radius of the circle in units of inverse mu
it is calculated in the following way:
For the entry coordinate, the y-component will be the y of the grid point and the x-component will be minus
the value of x on the circle at the height of this y.
For the exit coordinate:
Find the line y = ax + b that passes through grid_point at angle angle
The circle is x^2 + y^2 = r^2
The exit point is where these are simultaneous equations
x^2 + y^2 = r^2 & y = ax + b
x^2 + (ax+b)^2 = r^2
=> x^2 + a^2x^2 + 2abx + b^2 - r^2 = 0
=> (1+a^2) x^2 + 2abx + (b^2 - r^2) = 0
to find x_exit we find the roots of these equations and pick the root that is above y-grid
then we get y_exit from y_exit = a*x_exit + b
Returns
-------
(1) the coordinate of the entry point and (2) of the exit point of a beam entering horizontally
impinging on a coordinate point that lies in the circle and then exiting at some angle, angle.
"""
epsilon = 1e-7 # precision close to 90
angle = math.radians(angle)
xgrid = coordinate[0]
ygrid = coordinate[1]
entry_point = (-math.sqrt(self.radius**2 - ygrid**2), ygrid)
if not math.isclose(angle, math.pi / 2, abs_tol=epsilon):
b = ygrid - xgrid * math.tan(angle)
a = math.tan(angle)
xexit_root1, xexit_root2 = np.roots((1 + a**2, 2 * a * b, b**2 - self.radius**2))
yexit_root1 = a * xexit_root1 + b
yexit_root2 = a * xexit_root2 + b
if yexit_root2 >= yexit_root1: # We pick the point above
exit_point = (xexit_root2, yexit_root2)
else:
exit_point = (xexit_root1, yexit_root1)
else:
exit_point = (xgrid, math.sqrt(self.radius**2 - xgrid**2))
return entry_point, exit_point
[docs]
def get_path_length(self, grid_point, angle):
"""
return the path length
This is the pathlength of a horizontal line entering the circle at the
same height to the grid point then exiting at angle angle
Parameters
----------
grid_point double of floats
the coordinate inside the circle
angle float
the angle of the output beam
radius
the radius of the circle
Returns
-------
floats total distance, primary distance and secondary distance
"""
# move angle a tad above zero if it is zero to avoid it having the wrong sign due to some rounding error
angle_delta = 0.000001
if angle == float(0):
angle = angle + angle_delta
entry, exit = self._get_entry_exit_coordinates(grid_point, angle)
primary_distance = math.dist(grid_point, entry)
secondary_distance = math.dist(grid_point, exit)
total_distance = primary_distance + secondary_distance
return total_distance, primary_distance, secondary_distance
def _cve_brute_force(diffraction_data, mud):
"""
compute cve for the given mud on a global grid using the brute-force method
assume mu=mud/2, given that the same mu*D yields the same cve and D/2=1
"""
mu_sample_invmm = mud / 2
abs_correction = Gridded_circle(mu=mu_sample_invmm)
distances, muls = [], []
for angle in TTH_GRID:
abs_correction.set_distances_at_angle(angle)
abs_correction.set_muls_at_angle(angle)
distances.append(sum(abs_correction.distances))
muls.append(sum(abs_correction.muls))
distances = np.array(distances) / abs_correction.total_points_in_grid
muls = np.array(muls) / abs_correction.total_points_in_grid
cve = 1 / muls
cve_do = Diffraction_object(wavelength=diffraction_data.wavelength)
cve_do.insert_scattering_quantity(
TTH_GRID,
cve,
"tth",
metadata=diffraction_data.metadata,
name=f"absorption correction, cve, for {diffraction_data.name}",
wavelength=diffraction_data.wavelength,
scat_quantity="cve",
)
return cve_do
def _cve_polynomial_interpolation(diffraction_data, mud):
"""
compute cve using polynomial interpolation method, raise an error if mu*D is out of the range (0.5 to 6)
"""
if mud > 6 or mud < 0.5:
raise ValueError(
f"mu*D is out of the acceptable range (0.5 to 6) for polynomial interpolation. "
f"Please rerun with a value within this range or specifying another method from {* CVE_METHODS, }."
)
coeff_a, coeff_b, coeff_c, coeff_d, coeff_e = [
interpolation_function(mud) for interpolation_function in INTERPOLATION_FUNCTIONS
]
muls = np.array(coeff_a * MULS**4 + coeff_b * MULS**3 + coeff_c * MULS**2 + coeff_d * MULS + coeff_e)
cve = 1 / muls
cve_do = Diffraction_object(wavelength=diffraction_data.wavelength)
cve_do.insert_scattering_quantity(
TTH_GRID,
cve,
"tth",
metadata=diffraction_data.metadata,
name=f"absorption correction, cve, for {diffraction_data.name}",
wavelength=diffraction_data.wavelength,
scat_quantity="cve",
)
return cve_do
def _cve_method(method):
"""
retrieve the cve computation function for the given method
"""
methods = {
"brute_force": _cve_brute_force,
"polynomial_interpolation": _cve_polynomial_interpolation,
}
if method not in CVE_METHODS:
raise ValueError(f"Unknown method: {method}. Allowed methods are {*CVE_METHODS, }.")
return methods[method]
[docs]
def compute_cve(diffraction_data, mud, method="polynomial_interpolation"):
f"""
compute and interpolate the cve for the given diffraction data and mud using the selected method
Parameters
----------
diffraction_data Diffraction_object
the diffraction pattern
mud float
the mu*D of the diffraction object, where D is the diameter of the circle
method str
the method used to calculate cve, must be one of {* CVE_METHODS, }
Returns
-------
the diffraction object with cve curves
"""
cve_function = _cve_method(method)
abdo_on_global_tth = cve_function(diffraction_data, mud)
global_tth = abdo_on_global_tth.on_tth[0]
cve_on_global_tth = abdo_on_global_tth.on_tth[1]
orig_grid = diffraction_data.on_tth[0]
newcve = np.interp(orig_grid, global_tth, cve_on_global_tth)
cve_do = Diffraction_object(wavelength=diffraction_data.wavelength)
cve_do.insert_scattering_quantity(
orig_grid,
newcve,
"tth",
metadata=diffraction_data.metadata,
name=f"absorption correction, cve, for {diffraction_data.name}",
wavelength=diffraction_data.wavelength,
scat_quantity="cve",
)
return cve_do
[docs]
def apply_corr(diffraction_pattern, absorption_correction):
"""
Apply absorption correction to the given diffraction object modo with the correction diffraction object abdo
Parameters
----------
diffraction_pattern Diffraction_object
the input diffraction object to which the cve will be applied
absorption_correction Diffraction_object
the diffraction object that contains the cve to be applied
Returns
-------
a corrected diffraction object with the correction applied through multiplication
"""
corrected_pattern = diffraction_pattern * absorption_correction
return corrected_pattern