Source code for ectoolkits.analysis.atom_density

import os

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from ase.io import read, write
from ase.neighborlist import neighbor_list
from MDAnalysis.analysis.base import AnalysisBase

from ectoolkits.structures.slab import Slab
from ectoolkits.utils.utils import mic_1d
from ectoolkits.log import get_logger

logger = get_logger(__name__)


[docs]def analysis_run(args): if args.CONFIG: import json logger.info("Analysis Start!") with open(args.CONFIG, 'r') as f: inp = json.load(f) system = AtomDensity(inp) system.run() logger.info("FINISHED")
[docs]class AtomDensity(): def __init__(self, inp): logger.info("Perform Atom Density Analysis") # print file name self.twoD = False self.xyz_file = inp.get("xyz_file") if not os.path.isfile(self.xyz_file): raise FileNotFoundError logger.info("Read Structure File: {0}".format(inp["xyz_file"])) # print the cell self.cell = inp["cell"] logger.info("Read the Cell Info: {0}".format(inp["cell"])) # print surface 1 self.surf1 = inp["surf1"] self.surf1 = np.array(self.surf1) logger.info("Read Surface 1 Atoms Index: {0}".format(inp["surf1"])) # print surface 2 self.surf2 = inp["surf2"] self.surf2 = np.array(self.surf2) logger.info("Read Surface 2 Atoms Index: {0}".format(inp["surf2"])) if np.all(self.surf1 == self.surf2): self.twoD = True logger.info("Surface 1 and Surface 2 are the same, this is a 2D material.") self.atom_density = {} self.atom_density_z = {} # self.shift_center = inp["shift_center"] # logger.info("Density will shift center to water center: {0}".format(self.shift_center)) # slice for stucture index = inp.get("nframe", ":") self.density_type = inp.get("density_type") # Start reading structure logger.info("Now Start Reading Structures") logger.info("----------------------------") self.poses = read(self.xyz_file, index=index) self.poses[0] = Slab(self.poses[0]) for pos in self.poses: # wrap the cell pos.set_cell(self.cell) pos.set_pbc(True) pos.wrap() logger.info("Reading Structures is Finished") self.nframe = len(self.poses) logger.info("Read Frame Number: {0}".format(self.nframe)) self.natom = len(self.poses[0]) logger.info("Read Atom Number: {0}".format(self.natom))
[docs] def run(self): # read all structure and corresponding z self.all_z = self.get_all_z() # cell info self.cell_volume = self.poses[0].get_volume() _cell = self.poses[0].get_cell() self.xy_area = self.cell_volume/_cell[2][2] # surface 1 and 2 position along the trajectory self.surf1_z_list = self.get_surf1_z_list() self.surf2_z_list = self.get_surf2_z_list() self.surf1_z = self.surf1_z_list.mean() self.surf2_z = self.surf2_z_list.mean() # find the water center along the trajectory self.water_cent_list = self.get_water_cent_list() # water center relative to fisrt frame # self.water_cent_rel_s = self.water_cent_s - self.water_cent_s[0] # logger.info("Calculated Origin Water Center Position: {0} A".format(self.water_cent)) # logger.info("Water Center will shift to Cell Center: {0} A".format(self.cell[2]/2)) for param in self.density_type: idx_list = self.get_idx_list(param) self.get_atom_density(param, idx_list=idx_list) self.atom_density = pd.DataFrame(self.atom_density) self.atom_density_z = pd.DataFrame(self.atom_density_z)
# self.get_o_density() # self.dump_o_density()
[docs] def get_all_z(self) -> np.array: """get the z coordinates of atoms along trajectory _extended_summary_ Returns: np.array: the z coordinates of atoms """ all_z = [] for pos in self.poses: # z all_z.append(pos.get_positions().T[2]) all_z = np.stack(all_z) return all_z
[docs] def get_surf1_z_list(self) -> np.array: """calculate the surface 1 average position _extended_summary_ Returns: np.array: axis 0: traj """ surf1_z_list = self.all_z.T[self.surf1] surf1_z_list = surf1_z_list.T # wrap the surface atoms new_surf1_z_list = [] for surf1_z in surf1_z_list: surf1_z = mic_1d(surf1_z, self.cell[2]) new_surf1_z_list.append(surf1_z) surf1_z_list = np.stack(new_surf1_z_list) surf1_z_list = surf1_z_list.mean(axis=1) # all the z positions are wrapped to the first frame surf1_z_list = mic_1d(surf1_z_list, self.cell[2]) return surf1_z_list
[docs] def get_surf2_z_list(self) -> np.array: """calculate the surface 2 average position _extended_summary_ Returns: np.array: axis 0: traj """ surf2_z_list = self.all_z.T[self.surf2] surf2_z_list = surf2_z_list.T # wrap the surface atoms new_surf2_z_list = [] for surf2_z in surf2_z_list: surf2_z = mic_1d(surf2_z, self.cell[2]) new_surf2_z_list.append(surf2_z) surf2_z_list = np.stack(new_surf2_z_list) surf2_z_list = surf2_z_list.mean(axis=1) # all the z positions are wrapped to the first frame surf2_z_list = mic_1d(surf2_z_list, self.cell[2]) return surf2_z_list
[docs] def get_water_cent_list(self) -> np.array: water_cent_list = [] for surf1_z, surf2_z in zip(self.surf1_z_list, self.surf2_z_list): if surf2_z < surf1_z: surf2_z += self.cell[2] water_cent = (surf1_z + surf2_z)/2 water_cent_list.append(water_cent) water_cent_list = np.array(water_cent_list) return water_cent_list
[docs] def water_O_idx(self): # guess the o index of water i, j = neighbor_list('ij', self.poses[0], {('O', 'H'): 1.3}) cn = np.bincount(i) H2O_pair_list = [] Ow_idx = np.where(cn == 2)[0] np.savetxt(os.path.join(os.path.dirname( self.xyz_file), "Ow_idx.dat"), Ow_idx, fmt='%d') return Ow_idx
[docs] def get_idx_list(self, param): # get the idx from external input idx_method = param.get("idx_method") if idx_method == "manual": idx_list = self.get_idx_list_manual(param) elif idx_method == "all": idx_list = self.get_idx_list_all(param) else: logger.info("Not implement") raise ValueError return idx_list
[docs] def get_idx_list_manual(self, param): idx_list = param.get("idx_list") return idx_list
[docs] def get_idx_list_all(self, param): element = param.get("element") idx_list = self.poses[0].find_element_idx_list(element=element) return idx_list
[docs] def get_atom_density(self, param, idx_list): logger.info("START GETTING ATOM DENSITY") logger.info("----------------------------") dz = param.get("dz", 0.05) atom_z = self.all_z.T[idx_list] atom_z = atom_z - self.surf1_z_list # this might cause the num exceed cell boundary, need wrap the number atom_z_new = [] cell_z = self.poses[0].get_cell()[2][2] for num in atom_z.flatten(): atom_z_new.append(num % cell_z) atom_z = np.array(atom_z_new) # find the length between two surface if (self.surf1_z > self.surf2_z) or self.twoD: self.surf_space = self.surf2_z + cell_z - self.surf1_z else: self.surf_space = self.surf2_z - self.surf1_z bins = int(self.surf_space/dz) density, z = np.histogram( atom_z, bins=bins, range=(0, self.surf_space)) # throw the last one and move the number half bin z = z[:-1] + dz/2 # z = z[:-1] unit_conversion = self.get_unit_conversion(param, dz) # normalized wrt density of bulk water density = density/self.nframe * unit_conversion element = param.get("element") output_file = param.get("name", f"{element}_output") self.atom_density_z[output_file] = z self.atom_density[output_file] = density output_file = f"{output_file}.dat" np.savetxt( output_file, np.stack((z, density)).T, header="FIELD: z[A], atom_density" ) logger.info(f"Density Profile Data Save to {output_file}")
[docs] def get_unit_conversion(self, param, dz): density_unit = param.get("density_unit", "number") if density_unit == "water": bulk_density = 32/9.86**3 unit_conversion = self.xy_area*dz*bulk_density unit_conversion = 1.0/unit_conversion elif density_unit == "number": unit_conversion = 1.0 return unit_conversion
[docs] def get_ave_density(self, width_list): all_cent_density = {} all_cent_density[f"width"] = width_list for name, density in self.atom_density.items(): z = self.atom_density_z[name] cent = z.values[-1]/2 cent_density_list = [] for width in width_list: left_bound = cent - width/2 right_bound = cent + width/2 part_density = density[np.logical_and( (z >= left_bound), (z <= right_bound))] cent_density = part_density.mean() cent_density_list.append(cent_density) all_cent_density[f"{name}"] = cent_density_list return pd.DataFrame(all_cent_density)
[docs] def plot_density(self, sym=False): plt.rc('font', size=18) # controls default text size plt.rc('axes', titlesize=23) # fontsize of the title plt.rc('axes', labelsize=20) # fontsize of the x and y labels plt.rc('xtick', labelsize=18) # fontsize of the x tick labels plt.rc('ytick', labelsize=18) # fontsize of the y tick labels plt.rc('legend', fontsize=16) # fontsize of the legend # controls default text size plt.rc('lines', linewidth=2, markersize=10) plt.rc('axes', linewidth=2) plt.rc('xtick.major', size=10, width=2) plt.rc('ytick.major', size=10, width=2) fig = plt.figure(figsize=(16, 9)) ax = fig.add_subplot() for name, density in self.atom_density.items(): z = self.atom_density_z[name] density = density.values if sym: density = (np.flip(density) + density)/2 ax.plot(z, density, label=name) ax.legend() ax.set_ylabel("Density") ax.set_xlabel("z [A]") return fig
[docs]class Density(AnalysisBase): """ Calculate the density profile of a group of atoms along the z-axis for interface systems """ def __init__(self, atomgroup, **kwargs): super(Density, self).__init__(atomgroup.universe.trajectory, **kwargs) # Params # selection_area # selection_left_surface # selection_right_surface # solid |(surface left) liquid |(surface_right) solid # dz: default 0.05 resolution for density profile #self._param = param self.selection_area = kwargs.get("selection_area") self.selection_left_surface = kwargs.get("selection_left_surface") self.selection_right_surface = kwargs.get("selection_right_surface") self.dz = kwargs.get("dz", 0.05) self.output_file = kwargs.get("output_file", "density.dat") self.density_type = kwargs.get("density_type", "water") self._ag = atomgroup self.cellpar = self._ag.dimensions self.volume = self._ag.volume self.xy_area = self.volume/self.cellpar[2] def _prepare(self): # OPTIONAL # Called before iteration on the trajectory has begun. # Data structures can be set up at this time self.results.example_result = [] self._ag_area = self._ag.select_atoms(self.selection_area) self._ag_left_surface = self._ag.select_atoms(self.selection_left_surface) self._ag_right_surface = self._ag.select_atoms(self.selection_right_surface) # store the histogram of z area self.z_area = np.zeros(self.n_frames, len(self._ag_area)) self.z_left_surface = np.zeros(self.n_frames) self.z_right_surface = np.zeros(self.n_frames) def _single_frame(self): # REQUIRED # Called after the trajectory is moved onto each new frame. # store an example_result of `some_function` for a single frame # water density at each frame #self.results.example_result.append(some_function(self._ag, # self._parameter)) # get the z coordinates of atoms along trajectory #self._ag_area = self.z_left_surface[self._frame_index] = self._ag_left_surface.positions.T[2].mean() self.z_right_surface[self._frame_index] = self._ag_right_surface.positions.T[2].mean() self.z_area[self._frame_index] = self._ag_area.positions.T[2] def _conclude(self): # OPTIONAL # Called once iteration on the trajectory is finished. # Apply normalisation and averaging to results here. self.z_area = self.z_area - self.z_left_surface cell_z = self.cellpar[2] if (self.z_left_surface > self.z_right_surface) or self.twoD: self.electrolyte_width = self.z_right_surface + cell_z - self.z_left_surface else: self.electrolyte_width = self.z_right_surface - self.z_left_surface bins = int(self.electrolyte_widthe/self.dz) density, z = np.histogram( self.z_area, bins=bins, range=(0, self.electrolyte_width) ) # throw the last value and increase the half bin z = z[:-1] + self.dz/2 unit_conversion = self.get_unit_conversion() density = density/self.n_frames * unit_conversion # self.atom_density_z[output_file] = z # self.atom_density[output_file] = density np.savetxt( self.output_file, np.stack((z, density)).T, header="FIELD: z[A], atom_density" ) logger.info(f"Density Profile Data Save to {self.output_file}")
[docs] def get_unit_conversion(self): density_type = self.density_type if density_type == "water": bulk_density = 32/9.86**3 unit_conversion = self.xy_area*self.dz*bulk_density unit_conversion = 1.0/unit_conversion elif density_type == "number": unit_conversion = 1.0 return unit_conversion