""":module XMDYNPhotonMatterInteractor: Module that holds the XMDYNPhotonMatterInteractor class."""
##########################################################################
# #
# Copyright (C) 2015-2018 Carsten Fortmann-Grote #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu> #
# #
# This file is part of simex_platform. #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# simex_platform is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
# #
##########################################################################
import csv
import h5py
import numpy
import os
import shlex
import subprocess
import random
import string
from scipy import constants
from SimEx.Utilities.IOUtilities import get_dict_from_lines
from SimEx.Calculators.AbstractPhotonInteractor import AbstractPhotonInteractor
from SimEx.Parameters.PhotonMatterInteractorParameters import PhotonMatterInteractorParameters
from SimEx.Utilities.EntityChecks import checkAndSetInstance
from SimEx.Utilities.ParallelUtilities import getCUDAEnvironment
bohr_radius = constants.value('Bohr radius')
[docs]class XMDYNPhotonMatterInteractor(AbstractPhotonInteractor):
""" :class XMDYNPhotonMatterInteractor: Wrapper class for photon-matter interaction calculations using the XMDYN code."""
def __init__(
self,
parameters=None,
input_path=None,
output_path=None,
sample_path=None,
seed=1,
root_path=None,
):
"""
:param parameters: Parameters that govern the PMI calculation.
:type parameters: dict
:param input_path: Location of data needed by the PMI calculation (Laser source wavefront data).
:type input_path: str
:param output_path: Where to store the data generated by the PMI calculation.
:type output_path: str
:param sample_path: Location of the sample/target geometry file. Can be either a simS2E sample file or a pdb file. Specifying a pdb will first check if it's present in a database, if not, it will issue a query for the basename of the file to the RCSB protein data bank.
:type sample_path: str
:param root_path: Path to a root directory from which to restart a (previously failed) simulation.
:type root_path: str
"""
# Initialize base class.
super(XMDYNPhotonMatterInteractor, self).__init__(parameters,input_path,output_path)
self.__provided_data = ['/data/snp_<7 digit index>/ff',
'/data/snp_<7 digit index>/halfQ',
'/data/snp_<7 digit index>/Nph',
'/data/snp_<7 digit index>/r',
'/data/snp_<7 digit index>/T',
'/data/snp_<7 digit index>/Z',
'/data/snp_<7 digit index>/xyz',
'/data/snp_<7 digit index>/Sq_halfQ',
'/data/snp_<7 digit index>/Sq_bound',
'/data/snp_<7 digit index>/Sq_free',
'/history/parent/detail',
'/history/parent/parent',
'/info/package_version',
'/info/contact',
'/info/datdescription',
'/info/method_description',
'/version']
self.__expected_data = ['/data/arrEhor',
'/data/arrEver',
'/params/Mesh/nSlices',
'/params/Mesh/nx',
'/params/Mesh/ny',
'/params/Mesh/qxMax',
'/params/Mesh/qxMin',
'/params/Mesh/qyMax',
'/params/Mesh/qyMin',
'/params/Mesh/sliceMax',
'/params/Mesh/sliceMin',
'/params/Mesh/xMax',
'/params/xMin',
'/params/yMax',
'/params/yMin',
'/params/zCoord',
'/params/beamline/printout',
'/params/Rx',
'/params/Ry',
'/params/dRx',
'/params/dRy',
'/params/nval',
'/params/photonEnergy',
'/params/wDomain',
'/params/wEFieldUnit',
'/params/wFloatType',
'/params/wSpace',
'/params/xCentre',
'/params/yCentre',
'/info/package_version',
'/info/contact',
'/info/datdescription',
'/info/method_description',
'/misc/xFWHM',
'/misc/yFWHM',
'/version',
]
self.parameters = parameters
# Check sample path.
if sample_path is None:
raise ValueError( "A target/sample must be specified through the 'sample_path' argument." )
if not os.path.isfile(sample_path):
print("Sample file %s was not found. Will attempt to query from RCSB protein data bank." % ( sample_path))
self.__sample_path = sample_path
self.__cudadev = getCUDAEnvironment()['first_available_device_index']
self.__seed = seed
self.root_path = root_path
@property
def root_path(self):
""" Get the path to the restart data."""
return self.__root_path
@root_path.setter
def root_path(self, val):
""" Set the path to the restart data."""
if val is None:
val = os.path.join(self.output_path, "root."+''.join(random.choice(string.ascii_lowercase) for _ in range(8)))
if not isinstance(val, str):
raise TypeError("Parameter 'root_path' must be a path to a xmdyn root directory containing restart data.")
self.__root_path = val
if os.path.isdir(val):
self.__is_restart = True
@property
def parameters(self):
""" Query the calculator parameters.
:return: The parameters of this Calculator.
"""
return self.__parameters
@parameters.setter
def parameters(self, val):
""" Set (and check) the parameters. """
if val is None:
self.__parameters = PhotonMatterInteractorParameters()
if isinstance(val, dict):
self.__parameters = PhotonMatterInteractorParameters(parameters_dictionary=val)
if isinstance(val, PhotonMatterInteractorParameters):
self.__parameters = val
@property
def sample_path(self):
""" Get the path to the sample geometry file. """
return self.__sample_path
@sample_path.setter
def sample_path(self, value):
self.__sample_path=checkAndSetInstance(str, value, 'sample.h5')
[docs] def expectedData(self):
""" Query for the data expected by the Interactor. """
return self.__expected_data
[docs] def providedData(self):
""" Query for the data provided by the Interactor. """
return self.__provided_data
[docs] def backengine(self):
""" This method drives the backengine code."""
input_files = []
if os.path.isfile(self.input_path):
input_files = [self.input_path]
elif os.path.isdir(self.input_path):
input_files = [ os.path.join( self.input_path, f) for f in os.listdir( self.input_path ) if f.split('.')[-1] == 'h5' and f.split('.')[-2] != 'opmd' ]
input_files.sort()
else:
raise IOError("Input file %s does not exist or cannot be read." % (self.input_path) )
# Create output directory if not existing yet.
if not os.path.isdir( self.output_path ):
os.mkdir( self.output_path )
elif os.path.isfile( self.output_path ):
raise IOError( "Output file %s already exists, cowardly refusing to overwrite." % (self.output_path) )
# Generate formatted output files (i.e. attach history to output file).
for i,input_file in enumerate(input_files):
output_file = os.path.join( self.output_path , 'pmi_out_%07d.h5' % (i+1) )
command = os.environ["XMDYN"] + \
' --s2e_sample {0:s}'.format(self.sample_path) + \
' --prop_out {0:s}'.format(input_file) + \
' --pmi_out {0:s}'.format(output_file) + \
' --xatom-exe {0:s}'.format(os.environ["XATOM"]) + \
' --dbase {0:s}'.format(os.environ["XMDYNANDXATOMDBPATH"]) + \
' --seed {0:d}'.format(self.__seed) + \
' --s2e-rot "{0:f} {1:f} {2:f} {3:f}"'.format(*self.parameters.rotation) + \
' --pmi_params -' + \
' --root {0:s}'.format(self.__root_path)
command = shlex.split(command)
if self.__cudadev is not None:
command = command + ['--cudadev', '{0:d}'.format(self.__cudadev)]
if 'SIMEX_VERBOSE' in os.environ.keys():
print("PMI command:", " ".join(command))
proc = subprocess.Popen(command, stdout=subprocess.PIPE)
out, err = proc.communicate()
self.__process = proc
proc.wait()
if err == "" or err is None:
return 0
else:
return 1
@property
def data(self):
""" Query for the field data. """
return self.__data
def _readH5(self):
""" """
""" Private method for reading the hdf5 input and extracting the parameters and data relevant to initialize the object. """
pass # Nothing to be done since IO happens in backengine.
[docs] def saveH5(self):
""" """
"""
Private method to save the object to a file.
:param output_path: The file where to save the object's data.
:type output_path: str
"""
snapshots = self.__snapshots
with h5py.File(self.output_path, 'w') as h5:
self.setup_hierarchy(h5)
for it, snp in enumerate(snapshots):
# Load snapshot data as a dict
snapshot_dict = self.load_snp_from_dir(snp)
snapshot_dict['number_ophotons'] = self.__number_ophotons[it]
snapshot_dict['timestamp'] = self.__timestamps[it]
snapshot_dict['s2e_id'] = "{0:07d}".format(it+1)
self._save_snapshot(h5, snapshot_dict)
[docs] def load_snp_from_dir(self, path_to_snapshot) :
""" Load xmdyn output from an xmdyn directory.
:param path: The directory path to xmdyn output.
:type path: str
:param snapshot_index: Which snapshot to load.
:type snapshot_index: int
:returns: The snapshot data.
:rtype: dict
"""
print("LOG: Loading %s." % (path_to_snapshot))
xsnp = dict()
xsnp['Z'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'Z.dat' )) # Atom numbers
xsnp['T'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'T.dat' )) # Atom type
xsnp['uid'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'uid.dat' )) #Unique atom ID.
xsnp['r'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'r.dat' )) # Cartesian coordinates.
xsnp['v'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'v.dat' )) # Cartesian velocities.
xsnp['m'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'm.dat' )) # Masses.
xsnp['q'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'q.dat' )) # Ion charge
f0 = numpy.loadtxt(os.path.join(path_to_snapshot, 'f0.dat' )) # Form factors of each atom type.
# Patch if only one atomic species.
if f0.ndim == 1:
f0 = f0[numpy.newaxis,:]
xsnp['f0'] = f0
xsnp['Q'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'Q.dat' )) # Wavenumber grid for form factors.
xsnp['id'] = os.path.split(path_to_snapshot)[-1]
return xsnp
[docs] def setup_hierarchy(self,h5_handle):
""" Create all datagroups in the output hdf5 file.
:param h5_handle: File handle to writable hdf5 file.
:type h5_handle: h5py.File
"""
top_level_groups = [
'data',
'history',
'info',
'misc',
'params',
'version',
]
for i, group in enumerate(top_level_groups):
h5_handle.create_group(group)
# Store global parameters.
# Photon energy.
h5_handle['params'].create_dataset('photon_energy', data=self.__xmdyn_parameters['EPH'])
h5_handle['params/photon_energy'].attrs['unit'] = 'eV'
# Focus size
x_fwhm = y_fwhm = self.__xmdyn_parameters['DIAM']
focus = h5_handle['params'].create_group('focus')
focus.create_dataset('xFWHM', data=x_fwhm)
focus.create_dataset('yFWHM', data=y_fwhm)
focus.attrs['unit'] = "m"
h5_handle['params/photon_energy'].attrs['unit'] = 'eV'
def _save_snapshot( self, h5_handle, snapshot_dict ) :
""" Write a given snapshot to an open hdf5 file.
:param h5_handle: Handle to an open writable hdf5 file.
:type h5_handle: h5py.File
:param snapshot_dict: The snapshot to save.
:type snapshot: dict
"""
datgroup = h5_handle['/data']
snapshot_group = datgroup.create_group('snp_'+snapshot_dict['s2e_id'])
snapshot_group.create_dataset('T_xmdyn', data=snapshot_dict['T'].astype(numpy.int32))
snapshot_group.create_dataset('uid', data=snapshot_dict['uid'].astype(numpy.int32))
snapshot_group.create_dataset('Z', data=snapshot_dict['Z'])
####
# Needed? Or can we use uid and T from xmdyn output? Copied here from l 488 above.
xyz = self.dbase_Zq2id( snapshot_dict['Z'] , snapshot_dict['q'] ).astype(numpy.int32)
T = numpy.sort(numpy.unique(xyz))
####
snapshot_group.create_dataset('T', data=T.astype(numpy.int32))
### WIP
snapshot_group.create_dataset('xyz', data=xyz)
snapshot_group.create_dataset('r', data=snapshot_dict['r'] .astype(numpy.float32))
snapshot_group.create_dataset('Nph', data=numpy.array( [snapshot_dict['number_ophotons'].astype(numpy.int32)]))
snapshot_group.create_dataset('t', data=numpy.array( [snapshot_dict['timestamp'].astype(numpy.float32)]))
halfQ = snapshot_dict['Q']/ ( 2.0 * numpy.pi * bohr_radius * 2.0 )
snapshot_group.create_dataset('halfQ', data=halfQ.astype(numpy.float32))
###
# do we need to sort rows in ff? See l 490 ff above.
###
snapshot_group.create_dataset('ff', data=snapshot_dict['f0'].astype(numpy.float32), )
snapshot_group.create_dataset('Sq_halfQ', data=halfQ.astype(numpy.float32))
###
# Where do we get Sq_bound?
###
snapshot_group.create_dataset('Sq_bound', data=numpy.zeros_like(halfQ))
snapshot_group.create_dataset('Sq_free', data=numpy.zeros_like(halfQ))
def h5_out2in( src , dest , *args ) :
file_in = h5py.File( src , "r")
file_out = h5py.File( dest , "w")
grp_hist = file_out.create_group( "history" )
grp_hist_parent = file_out.create_group( "history/parent" )
grp_hist_parent_detail = file_out.create_group( "history/parent/detail" )
pre_s2e_module = os.path.basename( os.path.dirname( os.path.abspath( src ) ) )
print('Previous module: ' , pre_s2e_module)
# Add attribute to history/parent
grp_hist_parent.attrs['name'] = "_" + pre_s2e_module
grp_srchist = file_in.get( "history/parent" ) ;
file_out.copy( grp_srchist , grp_hist_parent )
# Copy everything to history except "data" & "history"
for objname in list(file_in.keys()) :
if objname != "data" and objname != "history" :
x = file_in.get( objname )
if isinstance(x, h5py.Dataset):
mygroup = file_in['/']
file_out["history/parent/detail/"+objname] = mygroup[objname][...]
elif isinstance(x, h5py.Group):
file_out.copy( x , "history/parent/detail/" + objname)
else:
print(objname , " has been SKIPPED!!")
print(objname)
else :
print(' NOT:', objname)
print(list(file_in['data'].keys()))
print(list(file_in['data'].items()))
# Create external link to parent's data
#file_out['history/parent/detail/data'] = h5py.ExternalLink( src ,'/data')
parent_module = os.path.basename( src ) [ : os.path.basename( src ) .find( '_out' ) ]
file_out['history/parent/detail/data'] = h5py.ExternalLink( '../' + parent_module + '/' + os.path.basename( src ) , '/data' )
# Create your own groups
grp_data = file_out.create_group( "data" )
grp_param = file_out.create_group( "params" )
grp_param = file_out.create_group( "misc" )
grp_param = file_out.create_group( "info" )
# Create s2e interface version
interface = file_out.create_dataset("info/interface_version", (1,), dtype='f')
interface[0] = 1.0
file_out.close()
file_in.close()
def _parse_xmdyn_xparams(input_file_path):
""" Parse XMDYN parameters file and extract timing and photon information.
:param input_file_path: Path to xmdyn input file.
:type input_file_path: str
:return: Parameter dictionary / object.
:rtype: dict || XMDYNPhotonMatterInteractorParameters
"""
# Open parameters file.
with open(input_file_path, newline='') as csv_handle:
# Get a csv reader.
reader = csv.reader(csv_handle, delimiter=' ' )
# Iterate through read lines.
ret = get_dict_from_lines(reader)
return ret