Source code for SimEx.Utilities.IOUtilities

""":module IOUtilities: Module for input/output utilities.  """
##########################################################################
#                                                                        #
# Copyright (C) 2015-2018 Carsten Fortmann-Grote                         #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu>                #
#                                                                        #
# This file is part of simex_platform.                                   #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by   #
# the Free Software Foundation, either version 3 of the License, or      #
# (at your option) any later version.                                    #
#                                                                        #
# simex_platform is distributed in the hope that it will be useful,      #
# but WITHOUT ANY WARRANTY; without even the implied warranty of         #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the          #
# GNU General Public License for more details.                           #
#                                                                        #
# You should have received a copy of the GNU General Public License      #
# along with this program.  If not, see <http://www.gnu.org/licenses/>.  #
#                                                                        #
##########################################################################

from Bio import PDB
from SimEx.Utilities import xpdb
from scipy.constants import m_e, c, e
from wpg.converters.genesis_v2 import read_genesis_file as genesis2
import h5py
import numpy
import os, shutil
import periodictable
import requests
import uuid

[docs]def getTmpFileName(): """ Create a unique filename :return: unique filename for temporary storage :rtype: str """ return os.getcwd()+"/"+str(uuid.uuid4())
[docs]def checkAndGetPDB( path ): """ Query a given pdb code from the PDB. :param path: The PDB code of the molecule to query. :type path: str :return: Path to the checked pdb file. """ if path is None: raise IOError( "The parameter 'path' must be a str.") if not isinstance (path, str): raise IOError( "The parameter 'path' must be a str.") # Setup paths and filenames. pdb_target_name = os.path.basename(path).split('.pdb')[0] pdb_target_dir = os.path.dirname(path) if not os.path.isfile( path ): # Query from pdb.org pdb_list = PDB.PDBList() if pdb_target_dir == '': pdb_target_dir = '.' try: print("PDB file %s could not be found. Attempting to query from protein database server." % (path)) download_target = pdb_list.retrieve_pdb_file( pdb_target_name, pdir=pdb_target_dir, file_format='pdb' ) if download_target in os.listdir( pdb_target_dir ): print("Successfully downloaded structure from PDB.") except: raise # Move and rename the downloaded file. shutil.move( download_target, path ) # Cleanup. shutil.rmtree('obsolete') return path
[docs]def loadXYZ( path=None): """ Load atomic structure from a xyz file and setup a dictionary readable by xmdyn calculator. :param path: The path to the xyz file. """ # Setup the return dictionary. atoms_dict = {'Z' : [], # Atomic number. 'r' : [], # Cartesian coordinates. 'selZ' : {}, # Abundance of each element. 'N' : 0, # Number of atoms. } with open(path) as fin: natoms = int(fin.readline()) title = fin.readline()[:-1] print("Reading %d atoms of %s from %s." % (natoms, title, path)) for x in range(natoms): line = fin.readline().split() # Get the element symbol from pdb. symbol = line[0] # Get the corresponding periodic table element as an instance. atom_obj = getattr(periodictable, symbol) # Query the atomic number. charge = atom_obj.number xyz = numpy.zeros(3, dtype="float64") xyz[:] = list(map(float, line[1:4])) # Write to dict. atoms_dict['Z'].append(charge) atoms_dict['r'].append(xyz*1e-10) if len(atoms_dict['Z']) == 0: raise IOError( "Error reading structure file %s. " % (path) ) # Get unique elements. for sel_Z in numpy.unique( atoms_dict['Z'] ) : atoms_dict['selZ'][sel_Z] = numpy.nonzero( atoms_dict['Z'] == sel_Z )[0] # Count number of atoms. atoms_dict['N'] = len( atoms_dict['Z'] ) # Convert to numpy arrays. atoms_dict['Z'] = numpy.array( atoms_dict['Z'] ) atoms_dict['r'] = numpy.array( atoms_dict['r'] ) # Return. return atoms_dict
[docs]def loadPDB( path = None ): """ Wrapper to convert a given pdb file to a sample dictionary used by e.g. the XMDYNCalculator. :param path: The path to the pdb file to be converted. :type path: str :return: The dictionary describing the sample molecule. :rtype: dict """ # Check if the sample is present, retrieve it from the pdb if not. target = checkAndGetPDB(path) # Convert to dict and return. return _pdbToS2ESampleDict(target)
def _pdbToS2ESampleDict(path=None): """ """ """ Workhorse function that converts a pdb file to the sample dictionary. :param path: Path to the pdb file to be loaded. :type path : string :return: The dictionary expected by downstream simex modules. :rtype : dict :throws IOError: Path not existing, not a pdb file or corrupt. """ try: if not os.path.isfile( path ): raise IOError() except: raise IOError( "Parameter 'path' must be a valid pdb file.") # Setup the return dictionary. atoms_dict = {'Z' : [], # Atomic number. 'r' : [], # Cartesian coordinates. 'selZ' : {}, # Abundance of each element. 'N' : 0, # Number of atoms. } # Attempt loading the pdb. try: #parser = PDB.PDBParser() #structure = parser.get_structure("sample", path) # Cope with > 100000 pdb atoms structure = xpdb.sloppyparser.get_structure("sample", path) # Get the atoms. atoms = structure.get_atoms() # Loop over atoms and get charge and coordinates. for atom in atoms: # Get the element symbol from pdb. symbol = atom.element.title() # Get the coordinates from pdb. coordinates = atom.coord # Get the corresponding periodic table element as an instance. atom_obj = getattr(periodictable, symbol) # Query the atomic number. charge = atom_obj.number # Write to dict. atoms_dict['Z'].append(charge) atoms_dict['r'].append(coordinates*1e-10) except: raise IOError( "Input file %s is not a valid pdb file. " % (path) ) if len(atoms_dict['Z']) == 0: raise IOError( "Input file %s is not a valid pdb file. " % (path) ) # Get unique elements. for sel_Z in numpy.unique( atoms_dict['Z'] ) : atoms_dict['selZ'][sel_Z] = numpy.nonzero( atoms_dict['Z'] == sel_Z )[0] # Count number of atoms. atoms_dict['N'] = len( atoms_dict['Z'] ) # Convert to numpy arrays. atoms_dict['Z'] = numpy.array( atoms_dict['Z'] ) atoms_dict['r'] = numpy.array( atoms_dict['r'] ) # Return. return atoms_dict
[docs]def pic2dist( pic_file_name, target='genesis'): """ Utility to extract particle data from openPMD and write into genesis distribution file. :param pic_file_name: Filename of openpmd input data file. :type pic_file_name: str :param target: The targeted file format (genesis || simplex). :type target: str """ # Check path. if not os.path.isfile(pic_file_name): raise RuntimeError("%s is not a file." % (pic_file_name)) # Check if input is native or openPMD. with h5py.File( pic_file_name, 'r' ) as h5_handle: timestep = list(h5_handle['data'].keys())[-1] positions = '/data/%s/particles/e/position/' % (timestep) momenta = '/data/%s/particles/e/momentum/' % (timestep) x_data = h5_handle[positions]['x'].value x_data_unit = h5_handle[positions]['x'].attrs['unitSI'] x = x_data*x_data_unit y_data = h5_handle[positions]['y'].value y_data_unit = h5_handle[positions]['y'].attrs['unitSI'] y = y_data*y_data_unit z_data = h5_handle[positions]['z'].value z_data_unit = h5_handle[positions]['z'].attrs['unitSI'] z = z_data*z_data_unit px_data = h5_handle[momenta]['x'].value px_data_unit = h5_handle[momenta]['x'].attrs['unitSI'] px = px_data*px_data_unit py_data = h5_handle[momenta]['y'].value py_data_unit = h5_handle[momenta]['y'].attrs['unitSI'] py = py_data*py_data_unit pz_data = h5_handle[momenta]['z'].value pz_data_unit = h5_handle[momenta]['z'].attrs['unitSI'] pz = pz_data*pz_data_unit # Convert to xprime, yprime. xprime = numpy.arctan(px/py) zprime = numpy.arctan(pz/py) # Calculate particle charge. charge_group = h5_handle['/data/%s/particles/e/charge/' %(timestep)] macroparticle_charge = charge_group.attrs['unitSI'] # Get number of particles and total charge. particle_patches = h5_handle['/data/%s/particles/e/particlePatches/numParticles' %(timestep)].value number_of_macroparticles = numpy.sum( particle_patches ) number_of_electrons_per_macroparticle = macroparticle_charge / e total_charge = number_of_macroparticles * macroparticle_charge # Calculate momentum psquare = px**2 + py**2 + pz**2 gamma = numpy.sqrt( 1. + psquare/((number_of_electrons_per_macroparticle * m_e*c)**2)) P = numpy.sqrt(psquare/((number_of_electrons_per_macroparticle * m_e*c)**2)) print("Number of electrons per macroparticle = ", number_of_electrons_per_macroparticle) print("Total charge = ", total_charge) print("Number of macroparticles = ", number_of_macroparticles) h5_handle.close() if target == 'genesis': return numpy.vstack([ x, xprime, z, zprime, y/c, P]).transpose(), total_charge elif target == 'simplex': return numpy.vstack([ y/c, x, xprime, z, zprime, gamma]).transpose(), total_charge
[docs]def genesis_dfl_to_wavefront(genesis_out, genesis_dfl): ''' Based on WPG/wpg/converters/genesis_v2.py ''' return genesis2(genesis_out, genesis_dfl)
[docs]def wgetData(url=None, path=None): """ Download a given url. """ # Local filename where data will be saved. local_filename = url.split('/')[-1] # Make https request. print("Attempting to download %s." % (url)) r = requests.get(url, stream=True) # Write to local file in chunks of 1 MB. with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) # After successful write, close the https connection. f.close() # Return. print("Download completed and saved to %s." % (local_filename)) return local_filename
#if __name__ == "__main__": #main() #data, charge = pic2dist(sys.argv[1], sys.argv[2]) ## Setup header for distribution file. #comments = "? " #size = data.shape[0] #header = "VERSION = 1.0\nSIZE = %d\nCHARGE = %7.6E\nCOLUMNS X XPRIME Y YPRIME T P" % (size, charge) #if sys.argv[2] == 'genesis': #numpy.savetxt( fname='beam.dist', X=data, header=header, comments=comments) #if sys.argv[2] == 'simplex': #numpy.savetxt( fname='beam.dist', X=data)
[docs]def get_dict_from_lines(reader): """ Turn a list of [key, ' ', ..., value] elements into a dict. :params reader: An iterable that contains lists of strings in format [key, ' ', ' ', ..., value] :type: iterable (list, array, generator). """ # These fields shall be handled as numeric data. numeric_keys = [ 'N', 'Z', 'DIST', 'EPH', 'NPH', 'DIAM', 'FLU_MAX', 'T', 'T0', 'R0', 'DT', 'STEPS', 'PROGRESS', 'RANDSEED', 'RSTARTE', ] # Initialize return dictionary. ret = dict() # Iteratoe through all lines. for line in reader: # Skip empty lines and comments. if line == []: continue if line[0][0] == '#': continue # Get key-value pair (they're separated by random number of whitespaces. key, val = line[0], line[-1] # Fix numeric data. if key in numeric_keys: try: val = float(val) except: raise # Store on dict. ret[key] = val # Return finished dict. return ret