Source code for SimEx.Calculators.XMDYNPhotonMatterInteractor

""":module XMDYNPhotonMatterInteractor: Module that holds the XMDYNPhotonMatterInteractor class."""
##########################################################################
#                                                                        #
# Copyright (C) 2015-2018 Carsten Fortmann-Grote                         #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu>                #
#                                                                        #
# This file is part of simex_platform.                                   #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by   #
# the Free Software Foundation, either version 3 of the License, or      #
# (at your option) any later version.                                    #
#                                                                        #
# simex_platform is distributed in the hope that it will be useful,      #
# but WITHOUT ANY WARRANTY; without even the implied warranty of         #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the          #
# GNU General Public License for more details.                           #
#                                                                        #
# You should have received a copy of the GNU General Public License      #
# along with this program.  If not, see <http://www.gnu.org/licenses/>.  #
#                                                                        #
##########################################################################

import csv
import h5py
import numpy
import os
import shlex
import subprocess
import random
import string
from scipy import constants

from SimEx.Utilities.IOUtilities import get_dict_from_lines
from SimEx.Calculators.AbstractPhotonInteractor import AbstractPhotonInteractor
from SimEx.Parameters.PhotonMatterInteractorParameters import PhotonMatterInteractorParameters
from SimEx.Utilities.EntityChecks import checkAndSetInstance
from SimEx.Utilities.ParallelUtilities import getCUDAEnvironment

bohr_radius = constants.value('Bohr radius')

[docs]class XMDYNPhotonMatterInteractor(AbstractPhotonInteractor): """ :class XMDYNPhotonMatterInteractor: Wrapper class for photon-matter interaction calculations using the XMDYN code.""" def __init__( self, parameters=None, input_path=None, output_path=None, sample_path=None, seed=1, root_path=None, ): """ :param parameters: Parameters that govern the PMI calculation. :type parameters: dict :param input_path: Location of data needed by the PMI calculation (Laser source wavefront data). :type input_path: str :param output_path: Where to store the data generated by the PMI calculation. :type output_path: str :param sample_path: Location of the sample/target geometry file. Can be either a simS2E sample file or a pdb file. Specifying a pdb will first check if it's present in a database, if not, it will issue a query for the basename of the file to the RCSB protein data bank. :type sample_path: str :param root_path: Path to a root directory from which to restart a (previously failed) simulation. :type root_path: str """ # Initialize base class. super(XMDYNPhotonMatterInteractor, self).__init__(parameters,input_path,output_path) self.__provided_data = ['/data/snp_<7 digit index>/ff', '/data/snp_<7 digit index>/halfQ', '/data/snp_<7 digit index>/Nph', '/data/snp_<7 digit index>/r', '/data/snp_<7 digit index>/T', '/data/snp_<7 digit index>/Z', '/data/snp_<7 digit index>/xyz', '/data/snp_<7 digit index>/Sq_halfQ', '/data/snp_<7 digit index>/Sq_bound', '/data/snp_<7 digit index>/Sq_free', '/history/parent/detail', '/history/parent/parent', '/info/package_version', '/info/contact', '/info/datdescription', '/info/method_description', '/version'] self.__expected_data = ['/data/arrEhor', '/data/arrEver', '/params/Mesh/nSlices', '/params/Mesh/nx', '/params/Mesh/ny', '/params/Mesh/qxMax', '/params/Mesh/qxMin', '/params/Mesh/qyMax', '/params/Mesh/qyMin', '/params/Mesh/sliceMax', '/params/Mesh/sliceMin', '/params/Mesh/xMax', '/params/xMin', '/params/yMax', '/params/yMin', '/params/zCoord', '/params/beamline/printout', '/params/Rx', '/params/Ry', '/params/dRx', '/params/dRy', '/params/nval', '/params/photonEnergy', '/params/wDomain', '/params/wEFieldUnit', '/params/wFloatType', '/params/wSpace', '/params/xCentre', '/params/yCentre', '/info/package_version', '/info/contact', '/info/datdescription', '/info/method_description', '/misc/xFWHM', '/misc/yFWHM', '/version', ] self.parameters = parameters # Check sample path. if sample_path is None: raise ValueError( "A target/sample must be specified through the 'sample_path' argument." ) if not os.path.isfile(sample_path): print("Sample file %s was not found. Will attempt to query from RCSB protein data bank." % ( sample_path)) self.__sample_path = sample_path self.__cudadev = getCUDAEnvironment()['first_available_device_index'] self.__seed = seed self.root_path = root_path @property def root_path(self): """ Get the path to the restart data.""" return self.__root_path @root_path.setter def root_path(self, val): """ Set the path to the restart data.""" if val is None: val = os.path.join(self.output_path, "root."+''.join(random.choice(string.ascii_lowercase) for _ in range(8))) if not isinstance(val, str): raise TypeError("Parameter 'root_path' must be a path to a xmdyn root directory containing restart data.") self.__root_path = val if os.path.isdir(val): self.__is_restart = True @property def parameters(self): """ Query the calculator parameters. :return: The parameters of this Calculator. """ return self.__parameters @parameters.setter def parameters(self, val): """ Set (and check) the parameters. """ if val is None: self.__parameters = PhotonMatterInteractorParameters() if isinstance(val, dict): self.__parameters = PhotonMatterInteractorParameters(parameters_dictionary=val) if isinstance(val, PhotonMatterInteractorParameters): self.__parameters = val @property def sample_path(self): """ Get the path to the sample geometry file. """ return self.__sample_path @sample_path.setter def sample_path(self, value): self.__sample_path=checkAndSetInstance(str, value, 'sample.h5')
[docs] def expectedData(self): """ Query for the data expected by the Interactor. """ return self.__expected_data
[docs] def providedData(self): """ Query for the data provided by the Interactor. """ return self.__provided_data
[docs] def backengine(self): """ This method drives the backengine code.""" input_files = [] if os.path.isfile(self.input_path): input_files = [self.input_path] elif os.path.isdir(self.input_path): input_files = [ os.path.join( self.input_path, f) for f in os.listdir( self.input_path ) if f.split('.')[-1] == 'h5' and f.split('.')[-2] != 'opmd' ] input_files.sort() else: raise IOError("Input file %s does not exist or cannot be read." % (self.input_path) ) # Create output directory if not existing yet. if not os.path.isdir( self.output_path ): os.mkdir( self.output_path ) elif os.path.isfile( self.output_path ): raise IOError( "Output file %s already exists, cowardly refusing to overwrite." % (self.output_path) ) # Generate formatted output files (i.e. attach history to output file). for i,input_file in enumerate(input_files): output_file = os.path.join( self.output_path , 'pmi_out_%07d.h5' % (i+1) ) command = os.environ["XMDYN"] + \ ' --s2e_sample {0:s}'.format(self.sample_path) + \ ' --prop_out {0:s}'.format(input_file) + \ ' --pmi_out {0:s}'.format(output_file) + \ ' --xatom-exe {0:s}'.format(os.environ["XATOM"]) + \ ' --dbase {0:s}'.format(os.environ["XMDYNANDXATOMDBPATH"]) + \ ' --seed {0:d}'.format(self.__seed) + \ ' --s2e-rot "{0:f} {1:f} {2:f} {3:f}"'.format(*self.parameters.rotation) + \ ' --pmi_params -' + \ ' --root {0:s}'.format(self.__root_path) command = shlex.split(command) if self.__cudadev is not None: command = command + ['--cudadev', '{0:d}'.format(self.__cudadev)] if 'SIMEX_VERBOSE' in os.environ.keys(): print("PMI command:", " ".join(command)) proc = subprocess.Popen(command, stdout=subprocess.PIPE) out, err = proc.communicate() self.__process = proc proc.wait() if err == "" or err is None: return 0 else: return 1
@property def data(self): """ Query for the field data. """ return self.__data def _readH5(self): """ """ """ Private method for reading the hdf5 input and extracting the parameters and data relevant to initialize the object. """ pass # Nothing to be done since IO happens in backengine.
[docs] def saveH5(self): """ """ """ Private method to save the object to a file. :param output_path: The file where to save the object's data. :type output_path: str """ snapshots = self.__snapshots with h5py.File(self.output_path, 'w') as h5: self.setup_hierarchy(h5) for it, snp in enumerate(snapshots): # Load snapshot data as a dict snapshot_dict = self.load_snp_from_dir(snp) snapshot_dict['number_ophotons'] = self.__number_ophotons[it] snapshot_dict['timestamp'] = self.__timestamps[it] snapshot_dict['s2e_id'] = "{0:07d}".format(it+1) self._save_snapshot(h5, snapshot_dict)
[docs] def load_snp_from_dir(self, path_to_snapshot) : """ Load xmdyn output from an xmdyn directory. :param path: The directory path to xmdyn output. :type path: str :param snapshot_index: Which snapshot to load. :type snapshot_index: int :returns: The snapshot data. :rtype: dict """ print("LOG: Loading %s." % (path_to_snapshot)) xsnp = dict() xsnp['Z'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'Z.dat' )) # Atom numbers xsnp['T'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'T.dat' )) # Atom type xsnp['uid'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'uid.dat' )) #Unique atom ID. xsnp['r'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'r.dat' )) # Cartesian coordinates. xsnp['v'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'v.dat' )) # Cartesian velocities. xsnp['m'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'm.dat' )) # Masses. xsnp['q'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'q.dat' )) # Ion charge f0 = numpy.loadtxt(os.path.join(path_to_snapshot, 'f0.dat' )) # Form factors of each atom type. # Patch if only one atomic species. if f0.ndim == 1: f0 = f0[numpy.newaxis,:] xsnp['f0'] = f0 xsnp['Q'] = numpy.loadtxt(os.path.join(path_to_snapshot, 'Q.dat' )) # Wavenumber grid for form factors. xsnp['id'] = os.path.split(path_to_snapshot)[-1] return xsnp
[docs] def setup_hierarchy(self,h5_handle): """ Create all datagroups in the output hdf5 file. :param h5_handle: File handle to writable hdf5 file. :type h5_handle: h5py.File """ top_level_groups = [ 'data', 'history', 'info', 'misc', 'params', 'version', ] for i, group in enumerate(top_level_groups): h5_handle.create_group(group) # Store global parameters. # Photon energy. h5_handle['params'].create_dataset('photon_energy', data=self.__xmdyn_parameters['EPH']) h5_handle['params/photon_energy'].attrs['unit'] = 'eV' # Focus size x_fwhm = y_fwhm = self.__xmdyn_parameters['DIAM'] focus = h5_handle['params'].create_group('focus') focus.create_dataset('xFWHM', data=x_fwhm) focus.create_dataset('yFWHM', data=y_fwhm) focus.attrs['unit'] = "m" h5_handle['params/photon_energy'].attrs['unit'] = 'eV'
def _save_snapshot( self, h5_handle, snapshot_dict ) : """ Write a given snapshot to an open hdf5 file. :param h5_handle: Handle to an open writable hdf5 file. :type h5_handle: h5py.File :param snapshot_dict: The snapshot to save. :type snapshot: dict """ datgroup = h5_handle['/data'] snapshot_group = datgroup.create_group('snp_'+snapshot_dict['s2e_id']) snapshot_group.create_dataset('T_xmdyn', data=snapshot_dict['T'].astype(numpy.int32)) snapshot_group.create_dataset('uid', data=snapshot_dict['uid'].astype(numpy.int32)) snapshot_group.create_dataset('Z', data=snapshot_dict['Z']) #### # Needed? Or can we use uid and T from xmdyn output? Copied here from l 488 above. xyz = self.dbase_Zq2id( snapshot_dict['Z'] , snapshot_dict['q'] ).astype(numpy.int32) T = numpy.sort(numpy.unique(xyz)) #### snapshot_group.create_dataset('T', data=T.astype(numpy.int32)) ### WIP snapshot_group.create_dataset('xyz', data=xyz) snapshot_group.create_dataset('r', data=snapshot_dict['r'] .astype(numpy.float32)) snapshot_group.create_dataset('Nph', data=numpy.array( [snapshot_dict['number_ophotons'].astype(numpy.int32)])) snapshot_group.create_dataset('t', data=numpy.array( [snapshot_dict['timestamp'].astype(numpy.float32)])) halfQ = snapshot_dict['Q']/ ( 2.0 * numpy.pi * bohr_radius * 2.0 ) snapshot_group.create_dataset('halfQ', data=halfQ.astype(numpy.float32)) ### # do we need to sort rows in ff? See l 490 ff above. ### snapshot_group.create_dataset('ff', data=snapshot_dict['f0'].astype(numpy.float32), ) snapshot_group.create_dataset('Sq_halfQ', data=halfQ.astype(numpy.float32)) ### # Where do we get Sq_bound? ### snapshot_group.create_dataset('Sq_bound', data=numpy.zeros_like(halfQ)) snapshot_group.create_dataset('Sq_free', data=numpy.zeros_like(halfQ))
def h5_out2in( src , dest , *args ) : file_in = h5py.File( src , "r") file_out = h5py.File( dest , "w") grp_hist = file_out.create_group( "history" ) grp_hist_parent = file_out.create_group( "history/parent" ) grp_hist_parent_detail = file_out.create_group( "history/parent/detail" ) pre_s2e_module = os.path.basename( os.path.dirname( os.path.abspath( src ) ) ) print('Previous module: ' , pre_s2e_module) # Add attribute to history/parent grp_hist_parent.attrs['name'] = "_" + pre_s2e_module grp_srchist = file_in.get( "history/parent" ) ; file_out.copy( grp_srchist , grp_hist_parent ) # Copy everything to history except "data" & "history" for objname in list(file_in.keys()) : if objname != "data" and objname != "history" : x = file_in.get( objname ) if isinstance(x, h5py.Dataset): mygroup = file_in['/'] file_out["history/parent/detail/"+objname] = mygroup[objname][...] elif isinstance(x, h5py.Group): file_out.copy( x , "history/parent/detail/" + objname) else: print(objname , " has been SKIPPED!!") print(objname) else : print(' NOT:', objname) print(list(file_in['data'].keys())) print(list(file_in['data'].items())) # Create external link to parent's data #file_out['history/parent/detail/data'] = h5py.ExternalLink( src ,'/data') parent_module = os.path.basename( src ) [ : os.path.basename( src ) .find( '_out' ) ] file_out['history/parent/detail/data'] = h5py.ExternalLink( '../' + parent_module + '/' + os.path.basename( src ) , '/data' ) # Create your own groups grp_data = file_out.create_group( "data" ) grp_param = file_out.create_group( "params" ) grp_param = file_out.create_group( "misc" ) grp_param = file_out.create_group( "info" ) # Create s2e interface version interface = file_out.create_dataset("info/interface_version", (1,), dtype='f') interface[0] = 1.0 file_out.close() file_in.close() def _parse_xmdyn_xparams(input_file_path): """ Parse XMDYN parameters file and extract timing and photon information. :param input_file_path: Path to xmdyn input file. :type input_file_path: str :return: Parameter dictionary / object. :rtype: dict || XMDYNPhotonMatterInteractorParameters """ # Open parameters file. with open(input_file_path, newline='') as csv_handle: # Get a csv reader. reader = csv.reader(csv_handle, delimiter=' ' ) # Iterate through read lines. ret = get_dict_from_lines(reader) return ret