#This is part of the source code for the Paineira Graphical User Interface - Iguape
#The code is distributed under the GNU GPL-3.0 License. Please refer to the main page (https://github.com/cnpem/iguape) for more information
"""
This is Monitor Class. It was built to track and read a given Folder for new XRD Data. It's dependent on the iguape_fileslist.txt text file.
It was built to work only for Paineira XRD Data, but it can easily be adjusted for other situations.
"""
import time, os, math, sys
import numpy as np
import lmfit as lm
from lmfit.models import PseudoVoigtModel, LinearModel, GaussianModel
import pandas as pd
from scipy.signal import find_peaks
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QFileDialog
# --- Monitor - Reading a '.txt' file for new data --- #
[docs]
class FolderMonitor(QThread):
"""
The Folder Monitor operates by tracking new or exiting data in a specified folder.
This class inherits the QThread class from PyQt. By reading the 'iguape_filelist.txt' file, it can track new or existing data files in the specified folder.
Later, it reads the data and stores it in a pandas DataFrame. If a fitting interval is specified, it fits the data to a desired model and stores the fitting parameters in another DataFrame.
Parameters
----------
folder_path (str): Path to the data folder.
fit_interval (list): 2theta interval selected to perform the peak fit.
"""
new_data_signal = pyqtSignal(pd.DataFrame)
[docs]
def __init__(self, folder_path, fit_interval=None):
"""
The constructor for the FolderMonitor class. It defines all the flags necessary for the class to work,
like the folder path, the fit interval, the fit model and the DataFrames.
Parameters
----------
folder_path (str): Path to the data folder.
fit_interval (list, optional): 2theta interval selected to perform. Default is None
"""
super().__init__()
self.folder_path = folder_path
self.fit_interval = fit_interval
self.fit_model = 'PseudoVoigt'
self.kelvin_sginal = False
self.data_frame = pd.DataFrame(columns=['file_name', 'temp', 'file_index', 'max'])
self.fit_data = pd.DataFrame(columns=['dois_theta_0', 'fwhm', 'area', 'temp', 'file_index', 'R-squared'])
[docs]
def run(self):
"""
The run method is the main method of the FolderMonitor class. It reads the 'iguape_filelist.txt' file
tracking all the XRD data in the folder. Then it reads the data and stores it as DataFrames. I also performs
the peak fit, if a fit interval is specified.
"""
reading_status = 1
i = 0
print(f'Monitoring folder: {self.folder_path}')
print('Waiting for XRD data! Please, wait')
while reading_status == 1:
while True:
try:
with open(os.path.join(self.folder_path,'iguape_filelist.txt'),"r") as file:
lines = file.read().splitlines()
line = lines[i+1]
data = data_read(os.path.join(self.folder_path,line))
self.kelvin_sginal = data[3]
file_index = counter()
new_data = pd.DataFrame({'file_name':[os.path.join(self.folder_path,line)], 'temp': [data[2]], 'file_index': [file_index], 'max': [data[1].max()]})
self.data_frame = pd.concat([self.data_frame, new_data], ignore_index=True)
self.new_data_signal.emit(new_data)
print(f"New data created at: {self.folder_path}. File name: {lines[i+1]}")
if self.fit_interval:
if self.fit_model == 'PseudoVoigt':
fit = peak_fit(data[0], data[1], self.fit_interval)
new_fit_data = pd.DataFrame({'dois_theta_0': [fit[0]], 'fwhm': [fit[1]], 'area': [fit[2]], 'temp': [data[2]], 'file_index': [file_index], 'R-squared': [fit[3]]})
self.fit_data = pd.concat([self.fit_data, new_fit_data], ignore_index=True)
else:
fit = peak_fit_split_gaussian(data[0], data[1], self.fit_interval, height=self.height, distance = self.distance)
new_fit_data = pd.DataFrame({'dois_theta_0': [fit[0]][0], 'fwhm': [fit[1][0]], 'area': [fit[2][0]], 'temp': [data[2]], 'file_index': [file_index], 'R-squared': [fit[3]]})
self.fit_data = pd.concat([self.fit_data, new_fit_data], ignore_index=True)
self.fit_data.insert(1,'dois_theta_0_#2', [fit[0][1]])
self.fit_data.insert(3, 'fwhm_#2', [fit[1][1]])
self.fit_data.insert(5, 'area_#2', [fit[2][1]])
reading_status = int(lines[i+2])
break
except Exception as e:
pass
i+=2
[docs]
def set_fit_interval(self, interval):
"""
Method for defining the fit interval.
Parameters
----------
interval (list): The 2theta interval to be used for the peak fitting.
"""
self.fit_interval = interval
[docs]
def set_fit_model(self, model):
"""
Method for defining the fit model.
Parameters
----------
model (str): The model to be used for the peak fitting. It can be 'PseudoVoigt' or 'SplitPseudoVoigt'.
"""
self.fit_model= model
[docs]
def set_distance(self, distance):
"""
Method for defining the minimum distance between the two peak centers. Only used when the model is
Split PseduoVoigt.
Parameters
----------
distance (float): The minimum distance between the two peak centers.
"""
self.distance = distance
[docs]
def set_height(self, height):
"""
Method for defining the minimum height of the two peaks. Only used when the model is
Split PseduoVoigt.
Parameters
----------
height (float): The minimum height of the two peaks.
"""
self.height = height
# --- Defining the functions for data reading and peak fitting --- #
[docs]
def data_read(path):
"""
Data reading function.
It reads the data from a given path and returns the 2theta and Intensity arrays, Temperature and Kelvin Signal tag.
Parameters
----------
path (str): Path to the data file.
Returns
-------
x (np.array): 2theta array.
y (np.array): Intensity array.
temp (float): Temperature.
kelvin_signal (bool): Kelvin Signal tag.
"""
done = False
while not done:
try:
data = pd.read_csv(path, sep = ',', header=0, comment="#")
x = np.array(data.iloc[:, 0])
y = np.array(data.iloc[:, 1])
file_name = path.split(sep='/')[len(path.split(sep='/'))-1]
temp = None
kelvin_signal = None
for i in file_name.split(sep='_'):
if 'Celsius' in i:
temp = float(i.split(sep='Celsius')[0]) #Getting the temperature
elif 'Kelvin' in i:
temp = float(i.split(sep='Kelvin')[0])
kelvin_signal = True
done = True
return x, y, temp, kelvin_signal
except pd.errors.EmptyDataError:
print(f"Warning: Empty file encountered: {path}. Trying to read the data again!")
except Exception as e:
print(f"An error occurred while reading data: {e}. Trying to read the data again!")
# --- Defining the storaging lists --- #
[docs]
def peak_fit(theta, intensity, interval, id, bkg = 'Linear', pars = None):
"""
Peak fitting function for the PseudoVoigt model.
Given a set of 2theta and Intensity arrays, it fits the data to the
PseudoVoigt model and 2theta interval selected. It returns the fitting parameters.
Parameters
----------
theta (np.array): 2theta array.
intensity (np.array): Intensity array.
interval (list): 2theta interval for the peak fitting.
bkg (str, optional): Background model. Default is 'Linear'.
Returns
-------
dois_theta_0 (float): Peak center.
fwhm (float): Full Width at Half Maximum.
area (float): Area under the peak.
r_squared (float): R-squared value of the fit.
out (lmfit.ModelResult): ModelResult. Inherited from the lmfit package.
comps (dict): Fitting components such as the backgroud and model function. Inherited from the lmfit package.
theta_fit (np.array): 2theta array for the fitting interval.
"""
done = False
while not done:
try:
theta_fit = []
intensity_fit = []
# Slicing the data for the selected peak fitting interval #
for i in range(len(theta)):
if theta[i] > interval[0] and theta[i] < interval[1]:
theta_fit.append(theta[i])
intensity_fit.append(intensity[i])
theta_fit=np.array(theta_fit)
intensity_fit=np.array(intensity_fit)
# Building the Voigt model with lmfit #
mod = PseudoVoigtModel(nan_policy='omit')
if pars == None:
pars = mod.guess(data= intensity_fit, x = theta_fit)
else:
pars['fraction'].value=0.5
background = LinearModel(prefix='bkg_')
pars.update(background.guess(data=intensity_fit, x = theta_fit))
mod += background
out = mod.fit(intensity_fit, pars, x=theta_fit) # Fitting the data to the Voigt model #
comps = out.eval_components(x=theta_fit)
print(f"Fit report for XRD #{id[0]} - {id[1]}°C", out.fit_report(), sep='\n')
# Getting the parameters from the optimal fit #, bkg= self.bkg_model
dois_theta_0 = out.params['center'].value
#dois_theta_0_stderr = out.params['center'].stderr*1
fwhm = out.params['fwhm'].value
#fwhm_stderr = out.params['fwhm'].stderr*1
area = out.params['amplitude'].value
#area_stderr = out.params['amplitude'].stderr*1
r_squared = out.rsquared
done = True
return dois_theta_0, fwhm, area, r_squared, out, comps, theta_fit, out.params
except ValueError or TypeError as e:
print(f'Fitting error, please wait: {e}! Please select a new fitting interval')
done = True
pass
[docs]
def pseudo_voigt(x, amplitude, center, sigma, eta):
r"""
PseudoVoigt function, a linear combination of a Gaussian and a Lorentzian function.
Parameters
----------
x (np.array): 2theta array.
amplitude (float): Peak amplitude.
center (float): Peak center.
sigma (float): Sigma value or standard deviation.
eta (float): Eta value (mixing parameter).
Returns
-------
np.array: PseudoVoigt function.
Notes
-----
The PseudoVoigt function is defined as:
.. math::
PV(x; A, \mu, \sigma, \eta) = \eta L(x; A, \mu, \sigma) + (1 - \eta) G(x; A, \mu, \sigma)
.. math::
L(x; A, \mu, \sigma) = \frac{A}{\pi} \left[ \frac{\sigma}{(x-\mu)^2 + \sigma^{2}} \right]
.. math::
G(x; A, \mu, \sigma) = \frac{A}{\sigma\sqrt{2\pi}}e^{\left[ \frac{-(x - \mu)^{2}}{2\sigma^{2}} \right]}
"""
sigma_g = sigma/math.sqrt(2*math.log(2))
gaussian = (amplitude/(sigma_g*math.sqrt(2*math.pi)))*np.exp(-(x-center)**2/(2*sigma_g** 2))
lorentzian = ((amplitude/math.pi)*sigma)/((x - center)**2 + sigma**2)
return eta*lorentzian + (1 - eta)*gaussian
[docs]
def split_pseudo_voigt(x, amp1, cen1, sigma1, eta1, amp2, cen2, sigma2, eta2):
r"""
Split PseudoVoigt function, a linear combination of two PseudoVoigt functions.
:param x: 2theta array
:type x: np.array
:param amp1: Peak amplitude for the first peak
:type amp1: np.array
:param cen1: Peak center for the first peak.
:type cen1: float
:param sigma1: Sigma value or standard deviation for the first peak.
:type sigma1: float
:param eta1: Eta value for the first peak (mixing parameter).
:type eta1: float
:param amp2: Peak amplitude for the second peak.
:type amp2: float
:param cen2: Peak center for the second peak.
:type cen2: float
:param sigma2: Sigma value or standard deviation for the second peak.
:type sigma2: float
:param eta2: Eta value for the second peak (mixing parameter).
:type eta2: float
:return: Split PseudoVoigt function
:rtype: np.array
Notes
-----
The Split PseudoVoigt function is defined as:
.. math::
SPV(x; A1, \mu1, \sigma1, \eta1, A2, \mu2, \sigma2, \eta2) = PV1(x; A1, \mu1, \sigma1, \eta1) + PV2(x; A2, \mu2, \sigma2, \eta2)
"""
return (pseudo_voigt(x, amplitude=amp1, center=cen1, sigma=sigma1, eta=eta1) +
pseudo_voigt(x, amplitude=amp2, center=cen2, sigma=sigma2, eta=eta2))
[docs]
def peak_fit_split_gaussian(theta, intensity, interval, id, bkg = 'Linear', height=1e+09, distance = 35, prominence=50, pars = None):
"""
Peak fitting function for the Split PseudoVoigt model.
Given a set of 2theta and Intensity arrays, it fits the data to the
Split PseudoVoigt model and 2theta interval selected. It returns the fitting parameters.
:param theta: 2theta array.
:type theta: np.array
:param intensity: Intensity array.
:type intensity: np.array
:param interval: 2theta interval for the peak fitting
:type interval: list
:param id: String with the index and temperature of the XRD data.
:type id: str
:param bkg: Background Model, defaults to 'Linear'
:type bkg: str, optional
:param height: Minimum height of the peaks, defaults to 1e+09
:type height: float, optional
:param distance: Minimum distance bewtween the peaks, defaults to 35
:type distance: int, optional
:param prominence: Minimum prominence of the peaks, defaults to 50
:type prominence: int, optional
:param pars: Parameters dictionary (as described by lmfit), defaults to None
:type pars: dict, optional
:return: tuple with fit results
:rtype: tuple
"""
done = False
while not done:
#time.sleep(0.5)
try:
theta_fit = []
intensity_fit = []
# Slicing the data for the selected peak fitting interval #
for i in range(len(theta)):
if theta[i] > interval[0] and theta[i] < interval[1]:
theta_fit.append(theta[i])
intensity_fit.append(intensity[i])
theta_fit=np.array(theta_fit)
intensity_fit=np.array(intensity_fit)
# Building the Voigt model with lmfit #
model = lm.Model(split_pseudo_voigt)
if pars == None:
peaks, properties = find_peaks(intensity_fit, height=height, distance=distance, prominence=prominence)
if len(peaks) >= 2:
# Sort peaks by height and pick the top two
sorted_indices = np.argsort(properties['peak_heights'])[-2:]
peak_positions = theta_fit[peaks][sorted_indices]
peak_heights = properties['peak_heights'][sorted_indices]
if peak_positions[0] > peak_positions[1]:
amp2, cen2 = peak_heights[0], peak_positions[0]
amp1, cen1 = peak_heights[1], peak_positions[1]
else:
amp1, cen1 = peak_heights[0], peak_positions[0]
amp2, cen2 = peak_heights[1], peak_positions[1]
# Estimate sigma using the width of the peaks at half height
sigma1 = 0.1/2.355
sigma2 = 0.1/2.355
pars = model.make_params(amp1=amp1, cen1=cen1, sigma1=sigma1, eta1=0.5, amp2=amp2, cen2=cen2, sigma2=sigma2, eta2=0.5)
else:
pars['eta1'].value = 0.5
pars['eta2'].value = 0.5
pars['cen1'].min, pars['cen1'].max = interval[0], interval[1]
pars['cen2'].min, pars['cen2'].max = interval[0], interval[1]
pars['eta1'].min, pars['eta1'].max = 0.0, 1.0
pars['eta2'].min, pars['eta2'].max = 0.0, 1.0
background = LinearModel(prefix='bkg_')
pars.update(background.guess(data=intensity_fit, x = theta_fit))
model += background
out = model.fit(intensity_fit, pars, x=theta_fit)
comps = out.eval_components(x=theta_fit)
print(f"Fit report for XRD #{id[0]} - {id[1]}°C", out.fit_report(), sep='\n')
dois_theta_0 = [out.params['cen1']*1, out.params['cen2']*1]
fwhm = [2.0*out.params['sigma1'], 2.0*out.params['sigma2']]
area = [out.params['amp1']*1, out.params['amp2']*1]
r_squared = out.rsquared
done = True
return dois_theta_0, fwhm, area, r_squared, out, comps, theta_fit, out.params
except ValueError as e:
print(f'Fitting error, please wait: {e}! Please select a new fitting interval')
done = True
pass
except TypeError as e:
print(f'Fitting error, please wait: {e}! Please select a new fitting interval')
done = True
pass
[docs]
def normalize_array(array: np.array):
"""
Normalizes (by the maximum) and array
:param array: Array to be normalized
:type array: np.array
:return: Normalized (by maximum) array
:rtype: np.array
"""
return array/np.max(array)
[docs]
def calculate_q_vector(wavelength: float, two_theta: np.ndarray):
r"""
Converts 2theta values into Q vector (Scattering vector).
.. math::
Q = \frac{4\pi}{\lambda} \sin{\theta}
:param wavelength: wavelength in Angstroms
:type wavelength: float
:param two_theta: 2theta values array
:type two_theta: np.ndarray
:return: Q-vector
:rtype: Q-vector values array
"""
return (4 * np.pi / wavelength) * np.sin(np.deg2rad(two_theta / 2))
# --- A counter function to index the created curves --- #
[docs]
def counter():
"""
Counter function. It counts the number of XRD data and returns its index.
:return: _Index of the XRD data
:rtype: int
"""
counter.count += 1
return counter.count
counter.count = 0
if __name__ == "__main__":
app = QApplication(sys.argv)
path = QFileDialog.getExistingDirectory(None, 'Select the data folder to monitor', '', options=QFileDialog.Options()) # Selection of monitoring folder
monitor = FolderMonitor(path)
monitor.start()
print(monitor.data_frame)
sys.exit(app.exec())