#This is part of the source code for the Paineira Graphical User Interface - Iguape
#The code is distributed under the GNU GPL-3.0 License. Please refer to the main page (https://github.com/cnpem/iguape) for more information
"""
This is Monitor Class. It was built to track and read a given Folder for new XRD Data. It's dependent on the iguape_fileslist.txt text file.
It was built to work only for Paineira XRD Data, but it can easily be adjusted for other situations.
"""
import time, os, math
import numpy as np
import lmfit as lm
from lmfit.models import PseudoVoigtModel, LinearModel
import pandas as pd
from scipy.signal import find_peaks
from PyQt5.QtCore import QThread, pyqtSignal
# --- Monitor - Reading a '.txt' file for new data --- #
[docs]
class FolderMonitor(QThread):
"""
The Folder Monitor operates by tracking new or exiting data in a specified folder.
This class inherits the QThread class from PyQt. By reading the 'iguape_filelist.txt' file, it can track new or existing data files in the specified folder.
Later, it reads the data and stores it in a pandas DataFrame. If a fitting interval is specified, it fits the data to a desired model and stores the fitting parameters in another DataFrame.
Parameters
----------
folder_path (str): Path to the data folder.
fit_interval (list): 2theta interval selected to perform the peak fit.
"""
new_data_signal = pyqtSignal(pd.DataFrame)
[docs]
def __init__(self, folder_path, fit_interval=None):
"""
The constructor for the FolderMonitor class. It defines all the flags necessary for the class to work,
like the folder path, the fit interval, the fit model and the DataFrames.
Parameters
----------
folder_path (str): Path to the data folder.
fit_interval (list, optional): 2theta interval selected to perform. Default is None
"""
super().__init__()
self.folder_path = folder_path
self.fit_interval = fit_interval
self.fit_model = 'PseudoVoigt'
self.kelvin_sginal = False
self.data_frame = pd.DataFrame(columns = ['theta', 'intensity', 'temp', 'max', 'file_index'])
self.fit_data = pd.DataFrame(columns=['dois_theta_0', 'fwhm', 'area', 'temp', 'file_index', 'R-squared'])
[docs]
def run(self):
"""
The run method is the main method of the FolderMonitor class. It reads the 'iguape_filelist.txt' file
tracking all the XRD data in the folder. Then it reads the data and stores it as DataFrames. I also performs
the peak fit, if a fit interval is specified.
"""
reading_status = 1
i = 0
print(f'Monitoring folder: {self.folder_path}')
print('Waiting for XRD data! Please, wait')
while reading_status == 1:
while True:
try:
with open(os.path.join(self.folder_path,'iguape_filelist.txt'),"r") as file:
lines = file.read().splitlines()
line = lines[i+1]
data = data_read(os.path.join(self.folder_path,line))
self.kelvin_sginal = data[3]
file_index = counter()
new_data = pd.DataFrame({'theta': [data[0]], 'intensity': [data[1]], 'temp': [data[2]], 'max': [data[1].max()], 'file_index': [file_index]})
self.data_frame = pd.concat([self.data_frame, new_data], ignore_index=True)
self.new_data_signal.emit(new_data)
print(f"New data created at: {self.folder_path}. File name: {lines[i+1]}")
if self.fit_interval:
if self.fit_model == 'PseudoVoigt':
fit = peak_fit(data[0], data[1], self.fit_interval)
new_fit_data = pd.DataFrame({'dois_theta_0': [fit[0]], 'fwhm': [fit[1]], 'area': [fit[2]], 'temp': [data[2]], 'file_index': [file_index], 'R-squared': [fit[3]]})
self.fit_data = pd.concat([self.fit_data, new_fit_data], ignore_index=True)
else:
fit = peak_fit_split_gaussian(data[0], data[1], self.fit_interval, height=self.height, distance = self.distance)
new_fit_data = pd.DataFrame({'dois_theta_0': [fit[0]][0], 'fwhm': [fit[1][0]], 'area': [fit[2][0]], 'temp': [data[2]], 'file_index': [file_index], 'R-squared': [fit[3]]})
self.fit_data = pd.concat([self.fit_data, new_fit_data], ignore_index=True)
self.fit_data.insert(1,'dois_theta_0_#2', [fit[0][1]])
self.fit_data.insert(3, 'fwhm_#2', [fit[1][1]])
self.fit_data.insert(5, 'area_#2', [fit[2][1]])
reading_status = int(lines[i+2])
break
except Exception as e:
#print(f'Exception: {e}')
time.sleep(0.1)
i+=2
[docs]
def set_fit_interval(self, interval):
"""
Method for defining the fit interval.
Parameters
----------
interval (list): The 2theta interval to be used for the peak fitting.
"""
self.fit_interval = interval
[docs]
def set_fit_model(self, model):
"""
Method for defining the fit model.
Parameters
----------
model (str): The model to be used for the peak fitting. It can be 'PseudoVoigt' or 'SplitPseudoVoigt'.
"""
self.fit_model= model
[docs]
def set_distance(self, distance):
"""
Method for defining the minimum distance between the two peak centers. Only used when the model is
Split PseduoVoigt.
Parameters
----------
distance (float): The minimum distance between the two peak centers.
"""
self.distance = distance
[docs]
def set_height(self, height):
"""
Method for defining the minimum height of the two peaks. Only used when the model is
Split PseduoVoigt.
Parameters
----------
height (float): The minimum height of the two peaks.
"""
self.height = height
# --- Defining the functions for data reading and peak fitting --- #
[docs]
def data_read(path):
"""
Data reading function.
It reads the data from a given path and returns the 2theta and Intensity arrays, Temperature and Kelvin Signal tag.
Parameters
----------
path (str): Path to the data file.
Returns
-------
x (np.array): 2theta array.
y (np.array): Intensity array.
temp (float): Temperature.
kelvin_signal (bool): Kelvin Signal tag.
"""
done = False
while not done:
time.sleep(0.1)
try:
dados = pd.read_csv(path, sep=',')
x = np.array(dados.get('2theta (degree)'))
y = np.array(dados.get('Intensity'))
file_name = path.split(sep='/')[len(path.split(sep='/'))-1]
temp = None
kelvin_signal = None
for i in file_name.split(sep='_'):
if 'Celsius' in i:
temp = float(i.split(sep='Celsius')[0]) #Getting the temperature
elif 'Kelvin' in i:
temp = float(i.split(sep='Kelvin')[0])
kelvin_signal = True
done = True
return x, y, temp, kelvin_signal
except pd.errors.EmptyDataError:
print(f"Warning: Empty file encountered: {path}. Trying to read the data again!")
#return None
except Exception as e:
print(f"An error occurred while reading data: {e}. Trying to read the data again!")
#return None
# --- Defining the storaging lists --- #
[docs]
def peak_fit(theta, intensity, interval, bkg = 'Linear'):
"""
Peak fitting function for the PseudoVoigt model.
Given a set of 2theta and Intensity arrays, it fits the data to the
PseudoVoigt model and 2theta interval selected. It returns the fitting parameters.
Parameters
----------
theta (np.array): 2theta array.
intensity (np.array): Intensity array.
interval (list): 2theta interval for the peak fitting.
bkg (str, optional): Background model. Default is 'Linear'.
Returns
-------
dois_theta_0 (float): Peak center.
fwhm (float): Full Width at Half Maximum.
area (float): Area under the peak.
r_squared (float): R-squared value of the fit.
out (lmfit.ModelResult): ModelResult. Inherited from the lmfit package.
comps (dict): Fitting components such as the backgroud and model function. Inherited from the lmfit package.
theta_fit (np.array): 2theta array for the fitting interval.
"""
done = False
while not done:
#time.sleep(0.5)
try:
theta_fit = []
intensity_fit = []
# Slicing the data for the selected peak fitting interval #
for i in range(len(theta)):
if theta[i] > interval[0] and theta[i] < interval[1]:
theta_fit.append(theta[i])
intensity_fit.append(intensity[i])
theta_fit=np.array(theta_fit)
intensity_fit=np.array(intensity_fit)
# Building the Voigt model with lmfit #
mod = PseudoVoigtModel(nan_policy='omit')
pars = mod.guess(data= intensity_fit, x = theta_fit)
background = LinearModel(prefix='bkg_')
pars.update(background.guess(data=intensity_fit, x = theta_fit))
mod += background
out = mod.fit(intensity_fit, pars, x=theta_fit) # Fitting the data to the Voigt model #
comps = out.eval_components(x=theta_fit)
# Getting the parameters from the optimal fit #, bkg= self.bkg_model
dois_theta_0 = out.params['center']*1
fwhm = out.params['fwhm']*1
area = out.params['amplitude']*1
r_squared = out.rsquared
done = True
return dois_theta_0, fwhm, area, r_squared, out, comps, theta_fit
except ValueError or TypeError as e:
print(f'Fitting error, please wait: {e}! Please select a new fitting interval')
done = True
pass
[docs]
def pseudo_voigt(x, amplitude, center, sigma, eta):
r"""
PseudoVoigt function, a linear combination of a Gaussian and a Lorentzian function.
Parameters
----------
x (np.array): 2theta array.
amplitude (float): Peak amplitude.
center (float): Peak center.
sigma (float): Sigma value or standard deviation.
eta (float): Eta value (mixing parameter).
Returns
-------
np.array: PseudoVoigt function.
Notes
-----
The PseudoVoigt function is defined as:
.. math::
PV(x; A, \mu, \sigma, \eta) = \eta L(x; A, \mu, \sigma) + (1 - \eta) G(x; A, \mu, \sigma)
where:
- L(x; A, \mu, \sigma) is the Lorentzian function
- G(x; A, \mu, \sigma) is the Gaussian function
"""
sigma_g = sigma/math.sqrt(2*math.log(2))
gaussian = (amplitude/(sigma_g*math.sqrt(2*math.pi)))*np.exp(-(x-center)**2/(2*sigma_g** 2))
lorentzian = ((amplitude/math.pi)*sigma)/((x - center)**2 + sigma**2)
return eta*lorentzian + (1 - eta)*gaussian
[docs]
def split_pseudo_voigt(x, amp1, cen1, sigma1, eta1, amp2, cen2, sigma2, eta2):
r"""
Split PseudoVoigt function, a linear combination of two PseudoVoigt functions.
Parameters
----------
x (np.array): 2theta array.
amp1 (float): Peak amplitude for the first peak.
cen1 (float): Peak center for the first peak.
sigma1 (float): Sigma value or standard deviation for the first peak.
eta1 (float): Eta value for the first peak (mixing parameter).
amp2 (float): Peak amplitude for the second peak.
cen2 (float): Peak center for the second peak.
sigma2 (float): Sigma value or standard deviation for the second peak.
eta2 (float): Eta value for the second peak (mixing parameter).
Returns
-------
np.array: Split PseudoVoigt function.
Notes
-----
The Split PseudoVoigt function is defined as:
.. math::
SPV(x; A1, \mu1, \sigma1, \eta1, A2, \mu2, \sigma2, \eta2) = PV1(x; A1, \mu1, \sigma1, \eta1) + PV2(x; A2, \mu2, \sigma2, \eta2)
"""
return (pseudo_voigt(x, amplitude=amp1, center=cen1, sigma=sigma1, eta=eta1) +
pseudo_voigt(x, amplitude=amp2, center=cen2, sigma=sigma2, eta=eta2))
[docs]
def peak_fit_split_gaussian(theta, intensity, interval, bkg = 'Linear', height=1e+09, distance = 35):
"""
Peak fitting function for the Split PseudoVoigt model.
Given a set of 2theta and Intensity arrays, it fits the data to the
Split PseudoVoigt model and 2theta interval selected. It returns the fitting parameters.
Parameters
----------
theta (np.array): 2theta array.
intensity (np.array): Intensity array.
interval (list): 2theta interval for the peak fitting.
bkg (str, optional): Background model. Default is 'Linear'.
height (float, optional): Minimum height for the peaks. Default is 1e+09.
distance (float, optional): Minimum distance between the peaks. Default is 35.
Returns
-------
dois_theta_0 (list): Peak centers.
fwhm (list): Full Width at Half Maximum.
area (list): Area under the peaks.
r_squared (float): R-squared value of the fit.
out (lmfit.ModelResult): ModelResult. Inherited from the lmfit package.
comps (dict): Fitting components such as the backgroud and model function. Inherited from the lmfit package.
theta_fit (np.array): 2theta array for the fitting interval.
"""
done = False
while not done:
#time.sleep(0.5)
try:
theta_fit = []
intensity_fit = []
# Slicing the data for the selected peak fitting interval #
for i in range(len(theta)):
if theta[i] > interval[0] and theta[i] < interval[1]:
theta_fit.append(theta[i])
intensity_fit.append(intensity[i])
theta_fit=np.array(theta_fit)
intensity_fit=np.array(intensity_fit)
# Building the Voigt model with lmfit #
peaks, properties = find_peaks(intensity_fit, height=height, distance=distance)
if len(peaks) >= 2:
# Sort peaks by prominence and pick the top two
sorted_indices = np.argsort(properties['peak_heights'])[-2:]
peak_positions = theta_fit[peaks][sorted_indices]
peak_heights = properties['peak_heights'][sorted_indices]
if peak_positions[0] > peak_positions[1]:
amp2, cen2 = peak_heights[0], peak_positions[0]
amp1, cen1 = peak_heights[1], peak_positions[1]
else:
amp1, cen1 = peak_heights[0], peak_positions[0]
amp2, cen2 = peak_heights[1], peak_positions[1]
# Estimate sigma using the width of the peaks at half height
sigma1 = 0.1/2.355
sigma2 = 0.1/2.355
model = lm.Model(split_pseudo_voigt)
pars = model.make_params(amp1=amp1, cen1=cen1, sigma1=sigma1, eta1=0.5, amp2=amp2, cen2=cen2, sigma2=sigma2, eta2=0.5)
pars['eta1'].min, pars['eta1'].max = 0, 1
pars['eta2'].min, pars['eta2'].max = 0, 1
background = LinearModel(prefix='bkg_')
pars.update(background.guess(data=intensity_fit, x = theta_fit))
model += background
out = model.fit(intensity_fit, pars, x=theta_fit) # Fitting the data to the Voigt model #
comps = out.eval_components(x=theta_fit)
# Getting the parameters from the optimal fit #, bkg= self.bkg_model
dois_theta_0 = [out.params['cen1']*1, out.params['cen2']*1]
fwhm = [2.0*out.params['sigma1'], 2.0*out.params['sigma2']]
area = [out.params['amp1']*1, out.params['amp2']*1]
r_squared = out.rsquared
done = True
return dois_theta_0, fwhm, area, r_squared, out, comps, theta_fit
except ValueError or TypeError as e:
print(f'Fitting error, please wait: {e}! Please select a new fitting interval')
done = True
pass
# --- A counter function to index the created curves --- #
[docs]
def counter():
"""
Counter function. It counts the number of XRD data and returns its index.
Returns
-------
int: Index of the XRD data.
"""
counter.count += 1
return counter.count
counter.count = 0