"""This module contains the simulation class and associated functions which are meant to encapsulate a LAMMPS simulation.
"""
from lammps import lammps
import npmc.molecule_class as mol
import npmc.atom_class as atm
import npmc.move_class as mvc
import os
import numpy as np
import random as rnd
import npmc.forcefield_class as ffc
import multiprocessing as mpc
#import dill
#from pathos.multiprocessing import ProcessPool
import concurrent.futures as cncf
from math import *
from subprocess import check_output
[docs]class Simulation(object):
"""This class encapsulates a LAMMPS simulation including its associated molecules and computes and fixes.
Parameters
----------
init_file : str
The file name of the initialization LAMMPS file. This file contains all the force field parameters computes and fixes that one wishes to specify for the LAMMPS simulation.
datafile : str
The LAMMPS data file which contains the coordinates of the atoms and bond, angle, and dihedral information.
dumpfile : str
The filename of the file which one wishes to dump the XYZ information of the simulation.
temp : float
The temperature which one wishes to run the simulation.
exclude : binary
A binary value that determines whether any interactions are excluded in the simulation.
numtrials : int
The number of trial rotations for each regrowth step in the configurationally biased moves
restart : binary
A binary value that determines whether this is a new simulation or a restart of a previous simulation.
"""
def __init__(self,init_file,datafile,dumpfile,temp,max_disp=1.0,type_lengths=(5,13),numtrials=5,anchortype=2,restart=False,parallel=False):
dname = os.path.dirname(os.path.abspath(init_file))
print("Configuration file is ".format(init_file))
print('Directory name is '.format(dname))
os.chdir(dname)
self.lmp = lammps("",["-echo","none","-screen","lammps.out"])
self.lmp.file(os.path.abspath(init_file))
#if parrallel:
# pool = mpc.Pool()
# self.lmp_clones = pool.map(self.clone_lammps,range(numtrials))
# pool.close()
self.molecules = mol.constructMolecules(datafile)
self.atomlist = self.get_atoms()
self.lmp.command("thermo 1")
self.lmp.command("thermo_style custom step etotal ke temp pe ebond eangle edihed eimp evdwl ecoul elong press")
self.temp = temp
self.dumpfile = os.path.abspath(dumpfile)
self.datafile = os.path.abspath(datafile)
self.init_file = os.path.abspath(init_file)
self.exclude=False
self.initializeGroups(self.lmp)
self.initializeComputes(self.lmp)
self.initializeFixes(self.lmp)
self.initializeMoves(anchortype,max_disp,type_lengths,numtrials,parallel)
self.initialize_data_files(restart)
self.step=0 if not restart else self.get_last_step_number()
self.update_neighbor_list()
def clone_lammps(self):
lmp2 = lammps("",["-echo","none","-screen","lammps.out"])
lmp2.file(os.path.abspath(self.init_file))
lmp2.command("thermo 1")
lmp2.command("thermo_style custom step etotal ke temp pe ebond eangle edihed eimp evdwl ecoul elong press")
self.initializeGroups(lmp2)
self.initializeComputes(lmp2)
self.initializeFixes(lmp2)
return(lmp2)
[docs] def initializeGroups(self,lmp):
"""Initialize the LAMMPS groups that one wishes to use in the simulation.
"""
lmp.command("group silver type 1")
lmp.command("group adsorbate type 2 3 4 5 6")
[docs] def initializeComputes(self,lmp):
"""Initializes the LAMMPS computes that one wishes to use in the simulation.
"""
lmp.command("compute pair_pe all pe")
lmp.command("compute mol_pe all pe dihedral")
lmp.command("compute coul_pe all pair lj/cut/coul/debye ecoul")
lmp.command("compute lj_pe all pair lj/cut/coul/debye evdwl")
lmp.command("compute pair_total all pair lj/cut/coul/debye")
[docs] def initializeFixes(self,lmp):
"""Initializes the fixes one wishes to use in the simulation.
"""
lmp.command("fix fxfrc silver setforce 0. 0. 0.")
def initializeMoves(self,anchortype,max_disp,type_lengths,numtrials,parallel=False):
cbmc_move = mvc.CBMCRegrowth(self,anchortype,numtrials)
translate_move = mvc.TranslationMove(self,max_disp,[1])
swap_move = mvc.CBMCSwap(self,anchortype,type_lengths,parallel=parallel)
rotation_move = mvc.RotationMove(self,anchortype,0.1745)
self.moves = [cbmc_move,translate_move,swap_move,rotation_move]
def initialize_data_files(self,restart=False):
if not restart:
with open('Potential_Energy.txt','w') as potential_file, open('Acceptance_Rate.txt','w') as acceptance_file:
potential_file.write("step\tEnergy\tmove\tAccepted?\n")
acceptance_file.write("step\t"+"\t".join(["#"+move.move_name+"\tRate" for move in self.moves])+"\n")
self.potential_file = open('Potential_Energy.txt','a')
self.acceptance_file = open("Acceptance_Rate.txt",'a')
def get_last_step_number(self):
last_line = check_output(["tail","-1",self.potential_file.name])
return(int(last_line.decode().split('\t')[0]))
[docs] def minimize(self,force_tol=1e-3,e_tol=1e-5,max_iter=200):
"""Minimizes the system using LAMMPS minimize function.
Parameters
----------
force_tol : float, optional
The force tolerance used in the minimization routine.
e_tol : float, optional
The energy tolerance used in the minimization routine.
max_iter : int, optional
The maximum allowed iterations in the minimization procedure.
"""
#self.lmp.command("dump xyzdump all xyz 10 "+str(self.dumpfile))
self.lmp.command("minimize "+str(force_tol)+" "+str(e_tol)+" "+str(max_iter)+" "+str(max_iter*10))
self.get_coords()
[docs] def dump_group(self,group_name,filename):
"""Dumps the atoms of the specified group to an XYZ file specified by filename
Parameters
----------
group_name : str
The group ID of the group of atoms that one wishes to dump
filename : str
The name of the file that the group of atoms will be dumped to.
As the specified format is XYZ it is a good idea to append .xyz to the end of the filename.
"""
self.lmp.command("write_dump "+group_name+" xyz "+filename)
[docs] def dump_atoms(self):
"""Dump the atom XYZ info to the dumpfile specified in the Simulation's dumpfile variable.
"""
self.lmp.command("write_dump all xyz "+self.dumpfile+" modify append yes")
def getCoulPE(self):
self.lmp.command("run 0 post no")
return self.lmp.extract_compute("coul_pe",0,0)
def getVdwlPE(self):
self.lmp.command("run 0 post no")
return self.lmp.extract_compute("lj_pe",0,0)
def get_pair_PE(self):
self.lmp.command("run 1 pre no post no")
return(self.lmp.extract_compute("pair_total",0,0))
def parallel_pair_PE(self,lmps,coords):
#import pdb;pdb.set_trace()
n = mpc.cpu_count()
with cncf.ThreadPoolExecutor(n) as executor:
energies = executor.map(self.get_clone_pair_PE,lmps,coords)
return energies
def get_clone_pair_PE(self,lmp2,coords):
self.update_clone_coords(lmp2,coords)
lmp2.command("run 0 post no")
return(lmp2.extract_compute("pair_total",0,0))
def get_total_PE(self):
self.lmp.command("run 0 post no")
return self.lmp.extract_compute("thermo_pe",0,0)
[docs] def assignAtomTypes(self):
"""Assign element names to the atom types in the simulation.
"""
atoms = atm.loadAtoms(self.datafile)
atomtypes =np.unique([atom.atomType for atom in atoms])
atom_type_dict={}
for atom_type in atomtypes:
atom_type_dict[atom_type]=raw_input("Enter element name for atom type "+str(atom_type)+": ")
self.atom_type_dict=atom_type_dict
def update_neighbor_list(self):
self.lmp.command("run 0 post no")
def turn_on_all_atoms(self):
self.lmp.command("neigh_modify exclude none")
if self.exclude:
self.exclude_type(self.excluded_types[0],self.excluded_types[1])
self.update_neighbor_list()
[docs] def turn_off_atoms(self,atomIDs):
"""Turns off short range interactions with specified atoms using 'neigh_modify exclude' command in LAMMPS
Parameters
----------
atomIDs : list of type int
A list of atom IDs of the atoms that will be turned off in the simulation
"""
stratoms = ' '.join(map(str,map(int,atomIDs)))
self.lmp.command("group offatoms intersect all all")
self.lmp.command("group offatoms clear")
self.lmp.command("group offatoms id "+stratoms)
self.lmp.command("neigh_modify exclude group offatoms all")
if self.exclude:
self.exclude_type(self.excluded_types[0],self.excluded_types[1])
self.update_neighbor_list()
def exclude_type(self,type1,type2):
self.excluded_types = (type1,type2)
self.exclude=True
self.lmp.command("neigh_modify exclude type %d %d" % (type1,type2))
[docs] def getRandomMolecule(self):
"""Returns a randomly selected molecule from the LAMMPS datafile associated with the given instance.
"""
return(rnd.choice(self.molecules))
def get_atoms(self):
atomlist = []
for key in self.molecules:
atomlist.extend(self.molecules[key].atoms)
return atomlist
def get_coords(self):
indxs = np.argsort([atom.atomID for atom in self.atomlist],axis=0)
coords = self.lmp.gather_atoms("x",1,3)
for idx,i in zip(indxs,range(len(self.atomlist))):
self.atomlist[idx].position[0]=float(coords[i*3])
self.atomlist[idx].position[1]=float(coords[i*3+1])
self.atomlist[idx].position[2]=float(coords[i*3+2])
[docs] def get_atom_coords(self):
"""Returns positions of each atom in a Nx3 array
Returns
--------
atom_coords : float array
A Nx3 array with each row representing an atom and the columns containing the x, ym and z coordinates in that order.
"""
atom_coords = np.array([atom.position for atom in self.atomlist])
return atom_coords
def update_coords(self):
indxs = np.argsort([atom.atomID for atom in self.atomlist],axis=0)
coords = self.lmp.gather_atoms("x",1,3)
for idx,i in zip(indxs,range(len(self.atomlist))):
coords[i*3]=self.atomlist[idx].position[0]
coords[i*3+1]=self.atomlist[idx].position[1]
coords[i*3+2]=self.atomlist[idx].position[2]
self.lmp.scatter_atoms("x",1,3,coords)
def update_clone_coords(self,lmp2,atom_coords):
indxs = np.argsort([atom.atomID for atom in self.atomlist],axis=0)
coords = lmp2.gather_atoms("x",1,3)
for idx,i in zip(indxs,range(atom_coords.shape[0])):
coords[i*3]=atom_coords[idx][0]
coords[i*3+1]=atom_coords[idx][1]
coords[i*3+2]=atom_coords[idx][2]
lmp2.scatter_atoms("x",1,3,coords)
def revert_coords(self,old_positions):
indxs = np.argsort([atom.atomID for atom in self.atomlist],axis=0)
coords = self.lmp.gather_atoms("x",1,3)
for idx,i in zip(indxs,range(len(old_positions))):
coords[i*3]=old_positions[idx][0]
coords[i*3+1]=old_positions[idx][1]
coords[i*3+2]=old_positions[idx][2]
self.lmp.scatter_atoms("x",1,3,coords)
self.get_coords()
def perform_mc_move(self):
move = rnd.choice(self.moves)
old_positions = np.copy([atom.position for atom in self.atomlist])
accepted = move.move()
if (not accepted):
self.revert_coords(old_positions)
self.step+=1
self.update_coords()
new_energy = self.get_total_PE()
self.potential_file.write(str(self.step)+'\t'+str(new_energy)+'\t'+str(move.move_name)+'\t'+str(accepted)+'\n')
self.acceptance_file.write(str(self.step)+"\t"+"\t".join([str(mc_move.num_moves)+"\t"+str(mc_move.get_acceptance_rate()) for mc_move in self.moves])+"\n")
self.acceptance_file.flush()
self.potential_file.flush()
return accepted