Source code for SimEx.Utilities.ParallelUtilities
""":module ParallelUtilities: Hosts utilities to query HPC runtime parameters."""
##########################################################################
# #
# Copyright (C) 2016-2017 Carsten Fortmann-Grote #
# 2016-2017 Sergey Yakubov #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu> #
# #
# This file is part of simex_platform. #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# simex_platform is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
# #
##########################################################################
import os
import subprocess
from distutils.version import StrictVersion
from py3nvml import py3nvml as nvml
def _getParallelResourceInfoFromEnv():
""" """
resource = {}
try:
resource['NCores'] = int(os.environ['SIMEX_NCORES'])
resource['NNodes'] = int(os.environ['SIMEX_NNODES'])
if resource['NNodes']<=0 or resource['NCores']<=0:
raise IOError()
except:
raise IOError( "SIMEX_NNODES and SIMEX_NCORES are set incorrectly")
return resource
def _getParallelResourceInfoFromSlurm():
""" """
resource = {}
try:
resource['NNodes'] = int(os.environ['SLURM_JOB_NUM_NODES'])
uniq_nodes=os.environ['SLURM_JOB_CPUS_PER_NODE'].split(",")
# SLURM sets this variable to something like 40x(2),20x(1),10x(10). We extract ncores from this
ncores=0
for node in uniq_nodes:
ind=node.find("(")
if ind==-1:
cores=node
mul=1
else:
cores=node[:ind-1]
mul=node[ind+1:node.find(")")]
ncores+=int(cores)*int(mul)
resource['NCores'] = ncores
if resource['NNodes']<=0 or resource['NCores']<=0:
raise IOError()
except:
raise IOError( "Cannot use SLURM_JOB_NUM_NODES and/or SLURM_JOB_CPUS_PER_NODE. Set SIMEX_NNODES and SIMEX_NCORES instead")
return resource
def _MPICommandName():
""" """
if 'SIMEX_MPICOMMAND' in os.environ:
mpicmd=os.environ['SIMEX_MPICOMMAND']
else:
mpicmd='mpirun'
return mpicmd
def _getParallelResourceInfoFromMpirun():
""" """
# we call mpirun hostname which returns list of nodes where mpi tasks will start. Each node can be
# listed several times (depending on mpi vendor) that gives us number of cores available for mpirun on this node
try:
mpicmd = _MPICommandName()
process = subprocess.Popen([mpicmd, "hostname"], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
(output, err) = process.communicate()
# Decode
output = output.decode('utf-8')
if process.returncode !=0:
return None
nodes=output.strip().split('\n')
resource = {}
resource['NNodes']=len(set(nodes))
resource['NCores']=len(nodes)
return resource
except:
return None
[docs]def getParallelResourceInfo():
"""
Utility extract information about available parallel resources.
@return : The dictionary expected by downstream simex modules.
@rtype : resource
"""
if 'SIMEX_NNODES' in os.environ and 'SIMEX_NCORES' in os.environ:
return _getParallelResourceInfoFromEnv()
if 'SLURM_JOB_NUM_NODES' in os.environ and 'SLURM_JOB_CPUS_PER_NODE' in os.environ:
return _getParallelResourceInfoFromSlurm()
resource=_getParallelResourceInfoFromMpirun()
if resource!=None:
return resource
else:
print("Was unable to determine parallel resources, will run in serial mode")
return dict([("NCores", 0),("NNodes",1)])
def _getMPIVersionInfo():
""" """
try:
mpi_cmd = _MPICommandName()
process = subprocess.Popen([mpi_cmd, "--version"], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
(output, err) = process.communicate()
output = output.decode('utf-8')
version = {}
if "(Open MPI)" in output:
version['Vendor']="OpenMPI"
version['Version']=output.split("(Open MPI)")[1].split('\n')[0].strip()
return version
if "HYDRA" in output:
version['Vendor']="MPICH"
version['Version']=output.split("Version:")[1].split('\n')[0].strip()
return version
except:
return None
def _getVendorSpecificMPIArguments(version, threads_per_task):
""" """
if version == None:
raise IOError( "Could not determine MPI vendor/version. Set SIMEX_MPICOMMAND or "
"provide backengine_mpicommand calculator parameter")
mpi_cmd=""
# mapping by node is required to distribute tasks in round-robin mode.
if version['Vendor'] == "OpenMPI":
if StrictVersion(version['Version'])>StrictVersion("1.8.0"):
mpi_cmd+=" --map-by node --bind-to none"
else:
mpi_cmd+=" --bynode"
# by default, all cores will be available, no need to set OMP_NUM_THREADS
if threads_per_task > 0:
mpi_cmd+=" -x OMP_NUM_THREADS="+str(threads_per_task)
mpi_cmd+=" -x OMPI_MCA_mpi_warn_on_fork=0 -x OMPI_MCA_btl_base_warn_component_unused=0"
elif version['Vendor'] == "MPICH":
mpi_cmd+=" -map-by node"
if threads_per_task > 0:
mpi_cmd+=" -env OMP_NUM_THREADS "+str(threads_per_task)
return mpi_cmd
[docs]def prepareMPICommandArguments(ntasks, threads_per_task=0):
"""
Utility prepares mpi arguments based on mpi version found in the system.
:param ntasks: Number of MPI tasks
:type ntasks: int
:param threads_per_task: Number of threads per task
:type threads_per_task: int
@return : String with mpi command and arguments
@rtype : string
"""
if ntasks < 0:
raise IOError("number of tasks should be positive")
mpi_cmd = _MPICommandName() + " -np " + str(ntasks)
version = _getMPIVersionInfo()
mpi_cmd+=_getVendorSpecificMPIArguments(version, threads_per_task)
if 'SIMEX_EXTRA_MPI_PARAMETERS' in os.environ:
mpi_cmd+=" "+os.environ['SIMEX_EXTRA_MPI_PARAMETERS']
return mpi_cmd
[docs]def getCUDAEnvironment():
""" Get the CUDA runtime environment parameters (number of cards etc.). """
rdict = dict()
rdict['first_available_device_index'] = None
rdict['device_count'] = 0
try:
nvml.nvmlInit()
rdict['device_count'] = nvml.nvmlDeviceGetCount()
except:
print('WARNING: At least one of (py3nvml.nvml, CUDA) is not available. Will continue without GPU.')
return rdict
for i in range(rdict['device_count']):
memory_info = nvml.nvmlDeviceGetMemoryInfo(nvml.nvmlDeviceGetHandleByIndex(i))
memory_usage_percentage = memory_info.used / memory_info.total
if memory_usage_percentage <= 0.1:
rdict['first_available_device_index'] = i
break
nvml.nvmlShutdown()
return rdict